This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/2.8.x by this push:
     new 6406335dd [#4485] Adjusting the address isolation logic of the 
servicecomb engine registration and config center (#4486)
6406335dd is described below

commit 6406335dd68ae0bee5422885cb793c6aca7733b2
Author: Alex <[email protected]>
AuthorDate: Mon Sep 2 19:55:59 2024 +0800

    [#4485] Adjusting the address isolation logic of the servicecomb engine 
registration and config center (#4486)
---
 .../http/client/common/AbstractAddressManager.java | 262 +++++++++------------
 .../client/common/AbstractAddressManagerTest.java  |  16 +-
 2 files changed, 114 insertions(+), 164 deletions(-)

diff --git 
a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
 
b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
index 88d74510a..0b3b7b6f5 100644
--- 
a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
+++ 
b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
@@ -23,12 +23,12 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -40,8 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class AbstractAddressManager {
@@ -53,42 +51,40 @@ public class AbstractAddressManager {
 
   private static final String V3_PREFIX = "/v3/";
 
-  private static final int DEFAULT_METRICS_WINDOW_TIME = 1;
+  private static final int DEFAULT_ADDRESS_CHECK_TIME = 30;
 
   private static final int ISOLATION_THRESHOLD = 3;
 
-  private List<String> addresses = new ArrayList<>();
+  private volatile List<String> addresses = new ArrayList<>();
+
+  // when all addresses are isolation, it will use this for polling.
+  private final List<String> defaultAddress = new ArrayList<>();
+
+  private final List<String> defaultIsolationAddress = new ArrayList<>();
 
   private int index;
 
   private String projectName;
 
-  // if address in same zone will be true; others will be false.
-  private final Map<String, Boolean> addressCategory = new HashMap<>();
+  // all address list.
+  private final Set<String> addressCategory = new HashSet<>();
 
   // recording continuous times of failure of an address.
   private final Map<String, Integer> addressFailureStatus = new 
ConcurrentHashMap<>();
 
-  // recording address isolation status, if isolated will be false
-  private final Map<String, Boolean> addressIsolated = new 
ConcurrentHashMap<>();
-
-  // recording address isolation status, if isolated will be false
-  private Cache<String, Boolean> addressIsolationStatus = 
CacheBuilder.newBuilder()
-      .maximumSize(100)
-      .expireAfterWrite(1, TimeUnit.MINUTES)
-      .build();
-
   private volatile List<String> availableZone = new ArrayList<>();
 
+  private final List<String> isolationZoneAddress = new ArrayList<>();
+
   private volatile List<String> availableRegion = new ArrayList<>();
 
-  private final List<String> defaultAddress = new ArrayList<>();
+  private final List<String> isolationRegionAddress = new ArrayList<>();
 
   private boolean addressAutoRefreshed = false;
 
   private final Object lock = new Object();
 
-  private Random random = new Random();
+  private final Random random = new Random();
 
   private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1,
       new ThreadFactoryBuilder()
@@ -99,24 +95,34 @@ public class AbstractAddressManager {
     this.projectName = DEFAULT_PROJECT;
     this.addresses.addAll(addresses);
     this.defaultAddress.addAll(addresses);
-    this.index = addresses.size() > 0 ? random.nextInt(addresses.size()) : 0;
+    this.addressCategory.addAll(addresses);
+    this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
+    startCheck();
   }
 
   public AbstractAddressManager(String projectName, List<String> addresses) {
     this.projectName = StringUtils.isEmpty(projectName) ? DEFAULT_PROJECT : 
projectName;
     this.addresses = this.transformAddress(addresses);
-    this.defaultAddress.addAll(this.addresses);
-    this.index = addresses.size() > 0 ? random.nextInt(addresses.size()) : 0;
+    this.defaultAddress.addAll(addresses);
+    this.addressCategory.addAll(this.addresses);
+    this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
+    startCheck();
   }
 
-  @VisibleForTesting
-  Cache<String, Boolean> getAddressIsolationStatus() {
-    return addressIsolationStatus;
+  public void refreshEndpoint(RefreshEndpointEvent event, String key) {
+    if (null == event || !event.getName().equals(key)) {
+      return;
+    }
+
+    availableZone = 
event.getSameZone().stream().map(this::normalizeUri).collect(Collectors.toList());
+    availableRegion = 
event.getSameRegion().stream().map(this::normalizeUri).collect(Collectors.toList());
+    addressCategory.addAll(availableZone);
+    addressCategory.addAll(availableRegion);
+    addressAutoRefreshed = true;
   }
 
-  @VisibleForTesting
-  void setAddressIsolationStatus(Cache<String, Boolean> 
addressIsolationStatus) {
-    this.addressIsolationStatus = addressIsolationStatus;
+  protected String normalizeUri(String endpoint) {
+    return new URLEndPoint(endpoint).toString();
   }
 
   @VisibleForTesting
@@ -137,24 +143,13 @@ public class AbstractAddressManager {
   }
 
   private void startCheck() {
-    executorService.scheduleAtFixedRate(this::checkHistory,
-        0,
-        DEFAULT_METRICS_WINDOW_TIME,
-        TimeUnit.MINUTES);
+    executorService.scheduleAtFixedRate(this::checkHistory, 0, 
DEFAULT_ADDRESS_CHECK_TIME, TimeUnit.SECONDS);
   }
 
   public String formatUrl(String url, boolean absoluteUrl, String address) {
     return absoluteUrl ? address + url : formatAddress(address) + url;
   }
 
-  public String address() {
-    if (!addressAutoRefreshed) {
-      return getDefaultAddress();
-    } else {
-      return getAvailableZoneAddress();
-    }
-  }
-
   public boolean sslEnabled() {
     return address().startsWith("https://";);
   }
@@ -163,10 +158,6 @@ public class AbstractAddressManager {
     return 
addresses.stream().map(this::formatAddress).collect(Collectors.toList());
   }
 
-  protected String getUrlPrefix(String address) {
-    return address + V3_PREFIX;
-  }
-
   protected String formatAddress(String address) {
     try {
       return getUrlPrefix(address) + 
HttpUtils.encodeURLParam(this.projectName);
@@ -175,27 +166,34 @@ public class AbstractAddressManager {
     }
   }
 
-  private String getDefaultAddress() {
-    List<String> addresses = getAvailableAddress(defaultAddress);
-    if (!addresses.isEmpty()) {
-      return getCurrentAddress(addresses);
+  protected String getUrlPrefix(String address) {
+    return address + V3_PREFIX;
+  }
+
+  public String address() {
+    if (!addressAutoRefreshed) {
+      return getDefaultAddress();
+    } else {
+      return getAvailableZoneAddress();
     }
-    return getInitAddress();
   }
 
-  private String getAvailableZoneAddress() {
-    List<String> addresses = getAvailableZoneIpPorts();
+  private String getDefaultAddress() {
     if (!addresses.isEmpty()) {
       return getCurrentAddress(addresses);
     }
-    return getInitAddress();
+    LOGGER.warn("all addresses are isolation, please check server status.");
+    // when all addresses are isolation, it will use all default address for 
polling.
+    return getCurrentAddress(new ArrayList<>(defaultAddress));
   }
 
-  // when all available address is fail, it will use all the initial addresses 
for polling.
-  private String getInitAddress() {
-    if (addresses.isEmpty()) {
-      return null;
+  private String getAvailableZoneAddress() {
+    List<String> zoneOrRegionAddress = getZoneOrRegionAddress();
+    if (!zoneOrRegionAddress.isEmpty()) {
+      return getCurrentAddress(zoneOrRegionAddress);
     }
+    LOGGER.warn("all auto discovery addresses are isolation, please check 
server status.");
+    // when all available address are isolation, it will use config addresses 
for polling.
     return getCurrentAddress(addresses);
   }
 
@@ -209,118 +207,88 @@ public class AbstractAddressManager {
     }
   }
 
-  private List<String> getAvailableZoneIpPorts() {
+  private List<String> getZoneOrRegionAddress() {
     List<String> results = new ArrayList<>();
     if (!availableZone.isEmpty()) {
-      results.addAll(getAvailableAddress(availableZone));
+      results.addAll(availableZone);
     } else {
-      results.addAll(getAvailableAddress(availableRegion));
+      results.addAll(availableRegion);
     }
     return results;
   }
 
-  private List<String> getAvailableAddress(List<String> endpoints) {
-    return endpoints.stream().filter(uri -> !addressIsolated.containsKey(uri) 
|| addressIsolated.get(uri))
-        .collect(Collectors.toList());
-  }
-
-  protected String normalizeUri(String endpoint) {
-    return new URLEndPoint(endpoint).toString();
-  }
-
-  public void refreshEndpoint(RefreshEndpointEvent event, String key) {
-    if (null == event || !event.getName().equals(key)) {
-      return;
-    }
-
-    availableZone = 
event.getSameZone().stream().map(this::normalizeUri).collect(Collectors.toList());
-    availableRegion = 
event.getSameRegion().stream().map(this::normalizeUri).collect(Collectors.toList());
-    availableZone.forEach(address -> addressCategory.put(address, true));
-    availableRegion.forEach(address -> addressCategory.put(address, false));
-    startCheck();
-    addressAutoRefreshed = true;
-  }
-
-  public void recordFailState(String address) {
-    synchronized (lock) {
-      if (!addressFailureStatus.containsKey(address)) {
-        addressFailureStatus.put(address, 1);
-        return;
-      }
-      int number = addressFailureStatus.get(address) + 1;
-      if (number < ISOLATION_THRESHOLD) {
-        addressFailureStatus.put(address, number);
-      } else {
-        removeAddress(address);
-      }
-    }
-  }
-
-  public void recordSuccessState(String address) {
-    addressFailureStatus.put(address, 0);
-  }
-
   @VisibleForTesting
   protected void checkHistory() {
-    addressIsolated.keySet().stream().filter(this::judgeIsolation).forEach(s 
-> {
-      if (telnetTest(s)) {
-        rejoinAddress(s);
+    addressCategory.forEach(address -> {
+      if (telnetTest(address)) {
+        // isolation addresses find address and restore it
+        findAndRestoreAddress(address);
       } else {
-        addressIsolationStatus.put(s, false);
+        recordFailState(address);
       }
     });
   }
 
-  private Boolean judgeIsolation(String address) {
-    try {
-      return addressIsolationStatus.get(address, () -> true);
-    } catch (ExecutionException e) {
-      return true;
-    }
-  }
-
   protected boolean telnetTest(String address) {
-    URI ipPort = parseIpPortFromURI(address);
+    URI uri = parseIpPortFromURI(address);
+    if (uri == null) {
+      return false;
+    }
     try (Socket s = new Socket()) {
-      s.connect(new InetSocketAddress(ipPort.getHost(), ipPort.getPort()), 
3000);
+      s.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), 3000);
       return true;
     } catch (IOException e) {
       LOGGER.warn("ping endpoint {} failed, It will be quarantined again.", 
address);
+      return false;
     }
-    return false;
   }
 
-  private URI parseIpPortFromURI(String uri) {
+  private URI parseIpPortFromURI(String address) {
     try {
-      return new URI(uri);
+      return new URI(address);
     } catch (URISyntaxException e) {
+      LOGGER.error("parse address [{}] failed.", address, e);
       return null;
     }
   }
 
-  //Query whether the current address belongs to the same AZ or region through 
azmap,
-  // add it to the sequence of, and delete the record in history
-  @VisibleForTesting
-  void rejoinAddress(String address) {
-    if (!addressAutoRefreshed) {
-      defaultAddress.add(address);
-      addressFailureStatus.put(address, 0);
-      addressIsolated.remove(address);
+  protected void findAndRestoreAddress(String address) {
+    recordSuccessState(address);
+    if (addressAutoRefreshed) {
+      if (isolationZoneAddress.remove(address)) {
+        LOGGER.warn("restore default address [{}]", address);
+        availableZone.add(address);
+        return;
+      }
+      if (isolationRegionAddress.remove(address)) {
+        LOGGER.warn("restore same zone address [{}]", address);
+        availableRegion.add(address);
+      }
       return;
     }
-
-    if (addressCategory.get(address) == null) {
-      LOGGER.warn("may not happen {}-{}", addressCategory.size(), address);
-      return;
+    if (defaultIsolationAddress.remove(address)) {
+      LOGGER.warn("restore same region address [{}]", address);
+      addresses.add(address);
     }
+  }
 
-    if (addressCategory.get(address)) {
-      availableZone.add(address);
-    } else {
-      availableRegion.add(address);
-    }
+  public void recordSuccessState(String address) {
     addressFailureStatus.put(address, 0);
-    addressIsolated.remove(address);
+  }
+
+  public void recordFailState(String address) {
+    synchronized (lock) {
+      if (!addressFailureStatus.containsKey(address)) {
+        addressFailureStatus.put(address, 1);
+        return;
+      }
+      int number = addressFailureStatus.get(address) + 1;
+      if (number < ISOLATION_THRESHOLD) {
+        addressFailureStatus.put(address, number);
+      } else {
+        removeAddress(address);
+      }
+    }
   }
 
   //Query whether the current address belongs to the same AZ or the same 
region through AZMap,
@@ -328,24 +296,20 @@ public class AbstractAddressManager {
   @VisibleForTesting
   void removeAddress(String address) {
     if (!addressAutoRefreshed) {
-      defaultAddress.remove(address);
-      addressIsolated.put(address, false);
-      addressIsolationStatus.put(address, false);
+      if (addresses.remove(address)) {
+        LOGGER.warn("isolation default address [{}]", address);
+        defaultIsolationAddress.add(address);
+      }
       return;
     }
-
-    if (addressCategory.get(address) == null) {
-      LOGGER.warn("may not happen {}-{}", addressCategory.size(), address);
+    if (availableZone.remove(address)) {
+      LOGGER.warn("isolation same zone address [{}]", address);
+      isolationZoneAddress.add(address);
       return;
     }
-
-    if (addressCategory.get(address)) {
-      availableZone.remove(address);
-    } else {
-      availableRegion.remove(address);
+    if (availableRegion.remove(address)) {
+      LOGGER.warn("isolation same region address [{}]", address);
+      isolationRegionAddress.add(address);
     }
-
-    addressIsolated.put(address, false);
-    addressIsolationStatus.put(address, false);
   }
 }
diff --git 
a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
 
b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
index 6bd0c7328..9229baa7a 100644
--- 
a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
+++ 
b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
@@ -32,9 +32,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
 public class AbstractAddressManagerTest {
 
   private static final List<String> addresses = new ArrayList<>();
@@ -124,20 +121,9 @@ public class AbstractAddressManagerTest {
     addressManager.recordFailState(address);
     Assertions.assertEquals("http://127.0.0.4:30100";, 
addressManager.address());
 
-    // mock cacheAddress status refresh after 10 minute
-    Cache<String, Boolean> cache = CacheBuilder.newBuilder()
-        .maximumSize(100)
-        .expireAfterWrite(10, TimeUnit.MINUTES)
-        .build();
-    cache.put("http://127.0.0.3:30100";, true);
-
-    addressManager.setAddressIsolationStatus(cache);
-    Cache<String, Boolean> result = addressManager.getAddressIsolationStatus();
-    Assertions.assertEquals(true, result.get("http://127.0.0.3:30100";, () -> 
false));
-
     // test restore isolation
     addressManager.checkHistory();
-    addressManager.rejoinAddress("http://127.0.0.3:30100";);
+    addressManager.findAndRestoreAddress("http://127.0.0.3:30100";);
     Assertions.assertEquals("http://127.0.0.3:30100";, 
addressManager.address());
     Assertions.assertEquals("http://127.0.0.3:30100";, 
addressManager.address());
   }

Reply via email to