This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new f76e20895 Optimize discoveryManager to achieve code separation of
concerns (#4571)
f76e20895 is described below
commit f76e208953dcab00e19051f335fbc5941dd8eaf3
Author: Karson <[email protected]>
AuthorDate: Wed Oct 30 15:05:20 2024 +0800
Optimize discoveryManager to achieve code separation of concerns (#4571)
---
.../foundation/common/NamedThreadFactory.java | 4 +-
.../servicecomb/registry/DiscoveryManager.java | 149 ++++++++++-----------
2 files changed, 72 insertions(+), 81 deletions(-)
diff --git
a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
index c6be9c1e0..3b79c9c8e 100644
---
a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
+++
b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
@@ -43,7 +43,9 @@ public class NamedThreadFactory implements ThreadFactory {
*/
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, prefix + "-" + threadNumber.getAndIncrement());
+ Thread thread = new Thread(r, prefix + "-" +
threadNumber.getAndIncrement());
+ thread.setDaemon(true);
+ return thread;
}
public String getPrefix() {
diff --git
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
index e5c6b8047..256b392de 100644
---
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
+++
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.servicecomb.foundation.common.NamedThreadFactory;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.utils.LambdaUtils;
@@ -57,6 +58,8 @@ public class DiscoveryManager implements LifeCycle {
private final InstancePing ping;
+ private final HealthCheckTask healthCheckTask = new HealthCheckTask();
+
// application:serviceName:instanceId
private final Map<String, Map<String, Map<String,
StatefulDiscoveryInstance>>>
allInstances = new ConcurrentHashMapEx<>();
@@ -76,91 +79,13 @@ public class DiscoveryManager implements LifeCycle {
discovery.setInstanceChangedListener(this::onInstancesChanged);
}
this.ping = pings.get(0);
- task = Executors.newScheduledThreadPool(1, (runnable) -> {
- Thread thread = new Thread(runnable, "discovery-manager-task") {
- @Override
- public void run() {
- try {
- runnable.run();
- } catch (Throwable e) {
- // This would never happen, because Worker will catch Throwable
and mute all tasks.
- LOGGER.error("discovery manager task error, not allowed please
fix. ", e);
- }
- }
- };
- thread.setPriority(Thread.MIN_PRIORITY);
- thread.setDaemon(true);
- return thread;
- });
+ task = Executors.newScheduledThreadPool(1, new
NamedThreadFactory("discovery-manager-task"));
}
public Discovery<? extends DiscoveryInstance> getPrimaryDiscovery() {
return this.discoveryList.get(0);
}
- private void doTask() {
- // doTask can not throw exception or will mute all tasks.
- try {
- doTaskImpl();
- } catch (Throwable e) {
- LOGGER.error("discovery manager task error. ", e);
- }
- }
-
- private void doTaskImpl() {
- Map<String, Map<String, List<String>>> removed = new HashMap<>();
- for (Entry<String, Map<String, Map<String, StatefulDiscoveryInstance>>>
apps : allInstances.entrySet()) {
- for (Entry<String, Map<String, StatefulDiscoveryInstance>> services :
apps.getValue().entrySet()) {
- boolean changed = false;
- for (StatefulDiscoveryInstance instance :
services.getValue().values()) {
- // check isolated time
- if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
- instance.getIsolatedTime() + instance.getIsolateDuration() <
System.currentTimeMillis()) {
- instance.setIsolationStatus(IsolationStatus.NORMAL);
- changed = true;
- }
- // check ping status
- if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) {
- boolean pingResult = ping.ping(instance);
- if (pingResult && instance.getPingStatus() != PingStatus.OK) {
- instance.setPingStatus(PingStatus.OK);
- changed = true;
- } else if (!pingResult && instance.getPingStatus() !=
PingStatus.FAIL) {
- instance.setPingStatus(PingStatus.FAIL);
- changed = true;
- }
- instance.setPingTime(System.currentTimeMillis());
- }
- // check unused
- if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
- if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
- instance.getPingStatus() == PingStatus.FAIL ||
- instance.getIsolationStatus() == IsolationStatus.ISOLATED) {
- removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
- .computeIfAbsent(services.getKey(), k -> new
ArrayList<>()).add(instance.getInstanceId());
- LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
- apps.getKey(), services.getKey(), instance.getRegistryName(),
- instance.getInstanceId(), instance.getHistoryStatus(),
- instance.getStatus(), instance.getPingStatus(),
instance.getIsolationStatus());
- changed = true;
- }
- }
- }
- if (changed) {
- rebuildVersionCache(apps.getKey(), services.getKey());
- }
- }
- }
- // remove unused
- for (Entry<String, Map<String, List<String>>> apps : removed.entrySet()) {
- for (Entry<String, List<String>> services : apps.getValue().entrySet()) {
- for (String instance : services.getValue()) {
-
allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
- }
- }
- }
- }
-
private void onInstancesChanged(String application, String serviceName,
List<? extends DiscoveryInstance> instances) {
onInstancesChanged(null, application, serviceName, instances);
@@ -326,7 +251,7 @@ public class DiscoveryManager implements LifeCycle {
@Override
public void run() {
discoveryList.forEach(LifeCycle::run);
- task.scheduleWithFixedDelay(this::doTask, 3, 3, TimeUnit.SECONDS);
+ task.schedule(healthCheckTask, 3, TimeUnit.SECONDS);
}
@Override
@@ -345,4 +270,68 @@ public class DiscoveryManager implements LifeCycle {
});
return result.toString();
}
+
+ class HealthCheckTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ Map<String, Map<String, List<String>>> removed = new HashMap<>();
+ for (Entry<String, Map<String, Map<String,
StatefulDiscoveryInstance>>> apps :
DiscoveryManager.this.allInstances.entrySet()) {
+ for (Entry<String, Map<String, StatefulDiscoveryInstance>> services
: apps.getValue().entrySet()) {
+ boolean changed = false;
+ for (StatefulDiscoveryInstance instance :
services.getValue().values()) {
+ // check isolated time
+ if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
+ instance.getIsolatedTime() + instance.getIsolateDuration() <
System.currentTimeMillis()) {
+ instance.setIsolationStatus(IsolationStatus.NORMAL);
+ changed = true;
+ }
+ // check ping status
+ if (System.currentTimeMillis() - instance.getPingTime() >
60_000L) {
+ boolean pingResult = DiscoveryManager.this.ping.ping(instance);
+ if (pingResult && instance.getPingStatus() != PingStatus.OK) {
+ instance.setPingStatus(PingStatus.OK);
+ changed = true;
+ } else if (!pingResult && instance.getPingStatus() !=
PingStatus.FAIL) {
+ instance.setPingStatus(PingStatus.FAIL);
+ changed = true;
+ }
+ instance.setPingTime(System.currentTimeMillis());
+ }
+ // check unused
+ if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
+ if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
+ instance.getPingStatus() == PingStatus.FAIL ||
+ instance.getIsolationStatus() == IsolationStatus.ISOLATED)
{
+ removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
+ .computeIfAbsent(services.getKey(), k -> new
ArrayList<>()).add(instance.getInstanceId());
+ LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
+ apps.getKey(), services.getKey(),
instance.getRegistryName(),
+ instance.getInstanceId(), instance.getHistoryStatus(),
+ instance.getStatus(), instance.getPingStatus(),
instance.getIsolationStatus());
+ changed = true;
+ }
+ }
+ }
+ if (changed) {
+ rebuildVersionCache(apps.getKey(), services.getKey());
+ }
+ }
+ }
+ // remove unused
+ for (Entry<String, Map<String, List<String>>> apps :
removed.entrySet()) {
+ for (Entry<String, List<String>> services :
apps.getValue().entrySet()) {
+ for (String instance : services.getValue()) {
+
DiscoveryManager.this.allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOGGER.error("discovery manager task error. ", e);
+ } finally {
+ DiscoveryManager.this.task.schedule(this, 3, TimeUnit.SECONDS);
+ }
+ }
+ }
}