This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/2.8.x by this push:
new 6406335dd [#4485] Adjusting the address isolation logic of the
servicecomb engine registration and config center (#4486)
6406335dd is described below
commit 6406335dd68ae0bee5422885cb793c6aca7733b2
Author: Alex <[email protected]>
AuthorDate: Mon Sep 2 19:55:59 2024 +0800
[#4485] Adjusting the address isolation logic of the servicecomb engine
registration and config center (#4486)
---
.../http/client/common/AbstractAddressManager.java | 262 +++++++++------------
.../client/common/AbstractAddressManagerTest.java | 16 +-
2 files changed, 114 insertions(+), 164 deletions(-)
diff --git
a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
index 88d74510a..0b3b7b6f5 100644
---
a/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
+++
b/clients/http-client-common/src/main/java/org/apache/servicecomb/http/client/common/AbstractAddressManager.java
@@ -23,12 +23,12 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -40,8 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class AbstractAddressManager {
@@ -53,42 +51,40 @@ public class AbstractAddressManager {
private static final String V3_PREFIX = "/v3/";
- private static final int DEFAULT_METRICS_WINDOW_TIME = 1;
+ private static final int DEFAULT_ADDRESS_CHECK_TIME = 30;
private static final int ISOLATION_THRESHOLD = 3;
- private List<String> addresses = new ArrayList<>();
+ private volatile List<String> addresses = new ArrayList<>();
+
+ // when all addresses are isolation, it will use this for polling.
+ private final List<String> defaultAddress = new ArrayList<>();
+
+ private final List<String> defaultIsolationAddress = new ArrayList<>();
private int index;
private String projectName;
- // if address in same zone will be true; others will be false.
- private final Map<String, Boolean> addressCategory = new HashMap<>();
+ // all address list.
+ private final Set<String> addressCategory = new HashSet<>();
// recording continuous times of failure of an address.
private final Map<String, Integer> addressFailureStatus = new
ConcurrentHashMap<>();
- // recording address isolation status, if isolated will be false
- private final Map<String, Boolean> addressIsolated = new
ConcurrentHashMap<>();
-
- // recording address isolation status, if isolated will be false
- private Cache<String, Boolean> addressIsolationStatus =
CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite(1, TimeUnit.MINUTES)
- .build();
-
private volatile List<String> availableZone = new ArrayList<>();
+ private final List<String> isolationZoneAddress = new ArrayList<>();
+
private volatile List<String> availableRegion = new ArrayList<>();
- private final List<String> defaultAddress = new ArrayList<>();
+ private final List<String> isolationRegionAddress = new ArrayList<>();
private boolean addressAutoRefreshed = false;
private final Object lock = new Object();
- private Random random = new Random();
+ private final Random random = new Random();
private final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
@@ -99,24 +95,34 @@ public class AbstractAddressManager {
this.projectName = DEFAULT_PROJECT;
this.addresses.addAll(addresses);
this.defaultAddress.addAll(addresses);
- this.index = addresses.size() > 0 ? random.nextInt(addresses.size()) : 0;
+ this.addressCategory.addAll(addresses);
+ this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
+ startCheck();
}
public AbstractAddressManager(String projectName, List<String> addresses) {
this.projectName = StringUtils.isEmpty(projectName) ? DEFAULT_PROJECT :
projectName;
this.addresses = this.transformAddress(addresses);
- this.defaultAddress.addAll(this.addresses);
- this.index = addresses.size() > 0 ? random.nextInt(addresses.size()) : 0;
+ this.defaultAddress.addAll(addresses);
+ this.addressCategory.addAll(this.addresses);
+ this.index = !addresses.isEmpty() ? random.nextInt(addresses.size()) : 0;
+ startCheck();
}
- @VisibleForTesting
- Cache<String, Boolean> getAddressIsolationStatus() {
- return addressIsolationStatus;
+ public void refreshEndpoint(RefreshEndpointEvent event, String key) {
+ if (null == event || !event.getName().equals(key)) {
+ return;
+ }
+
+ availableZone =
event.getSameZone().stream().map(this::normalizeUri).collect(Collectors.toList());
+ availableRegion =
event.getSameRegion().stream().map(this::normalizeUri).collect(Collectors.toList());
+ addressCategory.addAll(availableZone);
+ addressCategory.addAll(availableRegion);
+ addressAutoRefreshed = true;
}
- @VisibleForTesting
- void setAddressIsolationStatus(Cache<String, Boolean>
addressIsolationStatus) {
- this.addressIsolationStatus = addressIsolationStatus;
+ protected String normalizeUri(String endpoint) {
+ return new URLEndPoint(endpoint).toString();
}
@VisibleForTesting
@@ -137,24 +143,13 @@ public class AbstractAddressManager {
}
private void startCheck() {
- executorService.scheduleAtFixedRate(this::checkHistory,
- 0,
- DEFAULT_METRICS_WINDOW_TIME,
- TimeUnit.MINUTES);
+ executorService.scheduleAtFixedRate(this::checkHistory, 0,
DEFAULT_ADDRESS_CHECK_TIME, TimeUnit.SECONDS);
}
public String formatUrl(String url, boolean absoluteUrl, String address) {
return absoluteUrl ? address + url : formatAddress(address) + url;
}
- public String address() {
- if (!addressAutoRefreshed) {
- return getDefaultAddress();
- } else {
- return getAvailableZoneAddress();
- }
- }
-
public boolean sslEnabled() {
return address().startsWith("https://");
}
@@ -163,10 +158,6 @@ public class AbstractAddressManager {
return
addresses.stream().map(this::formatAddress).collect(Collectors.toList());
}
- protected String getUrlPrefix(String address) {
- return address + V3_PREFIX;
- }
-
protected String formatAddress(String address) {
try {
return getUrlPrefix(address) +
HttpUtils.encodeURLParam(this.projectName);
@@ -175,27 +166,34 @@ public class AbstractAddressManager {
}
}
- private String getDefaultAddress() {
- List<String> addresses = getAvailableAddress(defaultAddress);
- if (!addresses.isEmpty()) {
- return getCurrentAddress(addresses);
+ protected String getUrlPrefix(String address) {
+ return address + V3_PREFIX;
+ }
+
+ public String address() {
+ if (!addressAutoRefreshed) {
+ return getDefaultAddress();
+ } else {
+ return getAvailableZoneAddress();
}
- return getInitAddress();
}
- private String getAvailableZoneAddress() {
- List<String> addresses = getAvailableZoneIpPorts();
+ private String getDefaultAddress() {
if (!addresses.isEmpty()) {
return getCurrentAddress(addresses);
}
- return getInitAddress();
+ LOGGER.warn("all addresses are isolation, please check server status.");
+ // when all addresses are isolation, it will use all default address for
polling.
+ return getCurrentAddress(new ArrayList<>(defaultAddress));
}
- // when all available address is fail, it will use all the initial addresses
for polling.
- private String getInitAddress() {
- if (addresses.isEmpty()) {
- return null;
+ private String getAvailableZoneAddress() {
+ List<String> zoneOrRegionAddress = getZoneOrRegionAddress();
+ if (!zoneOrRegionAddress.isEmpty()) {
+ return getCurrentAddress(zoneOrRegionAddress);
}
+ LOGGER.warn("all auto discovery addresses are isolation, please check
server status.");
+ // when all available address are isolation, it will use config addresses
for polling.
return getCurrentAddress(addresses);
}
@@ -209,118 +207,88 @@ public class AbstractAddressManager {
}
}
- private List<String> getAvailableZoneIpPorts() {
+ private List<String> getZoneOrRegionAddress() {
List<String> results = new ArrayList<>();
if (!availableZone.isEmpty()) {
- results.addAll(getAvailableAddress(availableZone));
+ results.addAll(availableZone);
} else {
- results.addAll(getAvailableAddress(availableRegion));
+ results.addAll(availableRegion);
}
return results;
}
- private List<String> getAvailableAddress(List<String> endpoints) {
- return endpoints.stream().filter(uri -> !addressIsolated.containsKey(uri)
|| addressIsolated.get(uri))
- .collect(Collectors.toList());
- }
-
- protected String normalizeUri(String endpoint) {
- return new URLEndPoint(endpoint).toString();
- }
-
- public void refreshEndpoint(RefreshEndpointEvent event, String key) {
- if (null == event || !event.getName().equals(key)) {
- return;
- }
-
- availableZone =
event.getSameZone().stream().map(this::normalizeUri).collect(Collectors.toList());
- availableRegion =
event.getSameRegion().stream().map(this::normalizeUri).collect(Collectors.toList());
- availableZone.forEach(address -> addressCategory.put(address, true));
- availableRegion.forEach(address -> addressCategory.put(address, false));
- startCheck();
- addressAutoRefreshed = true;
- }
-
- public void recordFailState(String address) {
- synchronized (lock) {
- if (!addressFailureStatus.containsKey(address)) {
- addressFailureStatus.put(address, 1);
- return;
- }
- int number = addressFailureStatus.get(address) + 1;
- if (number < ISOLATION_THRESHOLD) {
- addressFailureStatus.put(address, number);
- } else {
- removeAddress(address);
- }
- }
- }
-
- public void recordSuccessState(String address) {
- addressFailureStatus.put(address, 0);
- }
-
@VisibleForTesting
protected void checkHistory() {
- addressIsolated.keySet().stream().filter(this::judgeIsolation).forEach(s
-> {
- if (telnetTest(s)) {
- rejoinAddress(s);
+ addressCategory.forEach(address -> {
+ if (telnetTest(address)) {
+ // isolation addresses find address and restore it
+ findAndRestoreAddress(address);
} else {
- addressIsolationStatus.put(s, false);
+ recordFailState(address);
}
});
}
- private Boolean judgeIsolation(String address) {
- try {
- return addressIsolationStatus.get(address, () -> true);
- } catch (ExecutionException e) {
- return true;
- }
- }
-
protected boolean telnetTest(String address) {
- URI ipPort = parseIpPortFromURI(address);
+ URI uri = parseIpPortFromURI(address);
+ if (uri == null) {
+ return false;
+ }
try (Socket s = new Socket()) {
- s.connect(new InetSocketAddress(ipPort.getHost(), ipPort.getPort()),
3000);
+ s.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), 3000);
return true;
} catch (IOException e) {
LOGGER.warn("ping endpoint {} failed, It will be quarantined again.",
address);
+ return false;
}
- return false;
}
- private URI parseIpPortFromURI(String uri) {
+ private URI parseIpPortFromURI(String address) {
try {
- return new URI(uri);
+ return new URI(address);
} catch (URISyntaxException e) {
+ LOGGER.error("parse address [{}] failed.", address, e);
return null;
}
}
- //Query whether the current address belongs to the same AZ or region through
azmap,
- // add it to the sequence of, and delete the record in history
- @VisibleForTesting
- void rejoinAddress(String address) {
- if (!addressAutoRefreshed) {
- defaultAddress.add(address);
- addressFailureStatus.put(address, 0);
- addressIsolated.remove(address);
+ protected void findAndRestoreAddress(String address) {
+ recordSuccessState(address);
+ if (addressAutoRefreshed) {
+ if (isolationZoneAddress.remove(address)) {
+ LOGGER.warn("restore default address [{}]", address);
+ availableZone.add(address);
+ return;
+ }
+ if (isolationRegionAddress.remove(address)) {
+ LOGGER.warn("restore same zone address [{}]", address);
+ availableRegion.add(address);
+ }
return;
}
-
- if (addressCategory.get(address) == null) {
- LOGGER.warn("may not happen {}-{}", addressCategory.size(), address);
- return;
+ if (defaultIsolationAddress.remove(address)) {
+ LOGGER.warn("restore same region address [{}]", address);
+ addresses.add(address);
}
+ }
- if (addressCategory.get(address)) {
- availableZone.add(address);
- } else {
- availableRegion.add(address);
- }
+ public void recordSuccessState(String address) {
addressFailureStatus.put(address, 0);
- addressIsolated.remove(address);
+ }
+
+ public void recordFailState(String address) {
+ synchronized (lock) {
+ if (!addressFailureStatus.containsKey(address)) {
+ addressFailureStatus.put(address, 1);
+ return;
+ }
+ int number = addressFailureStatus.get(address) + 1;
+ if (number < ISOLATION_THRESHOLD) {
+ addressFailureStatus.put(address, number);
+ } else {
+ removeAddress(address);
+ }
+ }
}
//Query whether the current address belongs to the same AZ or the same
region through AZMap,
@@ -328,24 +296,20 @@ public class AbstractAddressManager {
@VisibleForTesting
void removeAddress(String address) {
if (!addressAutoRefreshed) {
- defaultAddress.remove(address);
- addressIsolated.put(address, false);
- addressIsolationStatus.put(address, false);
+ if (addresses.remove(address)) {
+ LOGGER.warn("isolation default address [{}]", address);
+ defaultIsolationAddress.add(address);
+ }
return;
}
-
- if (addressCategory.get(address) == null) {
- LOGGER.warn("may not happen {}-{}", addressCategory.size(), address);
+ if (availableZone.remove(address)) {
+ LOGGER.warn("isolation same zone address [{}]", address);
+ isolationZoneAddress.add(address);
return;
}
-
- if (addressCategory.get(address)) {
- availableZone.remove(address);
- } else {
- availableRegion.remove(address);
+ if (availableRegion.remove(address)) {
+ LOGGER.warn("isolation same region address [{}]", address);
+ isolationRegionAddress.add(address);
}
-
- addressIsolated.put(address, false);
- addressIsolationStatus.put(address, false);
}
}
diff --git
a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
index 6bd0c7328..9229baa7a 100644
---
a/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
+++
b/clients/http-client-common/src/test/java/org/apache/servicecomb/http/client/common/AbstractAddressManagerTest.java
@@ -32,9 +32,6 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
public class AbstractAddressManagerTest {
private static final List<String> addresses = new ArrayList<>();
@@ -124,20 +121,9 @@ public class AbstractAddressManagerTest {
addressManager.recordFailState(address);
Assertions.assertEquals("http://127.0.0.4:30100",
addressManager.address());
- // mock cacheAddress status refresh after 10 minute
- Cache<String, Boolean> cache = CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite(10, TimeUnit.MINUTES)
- .build();
- cache.put("http://127.0.0.3:30100", true);
-
- addressManager.setAddressIsolationStatus(cache);
- Cache<String, Boolean> result = addressManager.getAddressIsolationStatus();
- Assertions.assertEquals(true, result.get("http://127.0.0.3:30100", () ->
false));
-
// test restore isolation
addressManager.checkHistory();
- addressManager.rejoinAddress("http://127.0.0.3:30100");
+ addressManager.findAndRestoreAddress("http://127.0.0.3:30100");
Assertions.assertEquals("http://127.0.0.3:30100",
addressManager.address());
Assertions.assertEquals("http://127.0.0.3:30100",
addressManager.address());
}