This is an automated email from the ASF dual-hosted git repository.

youling1128 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new f76e20895 Optimize discoveryManager to achieve code separation of 
concerns (#4571)
f76e20895 is described below

commit f76e208953dcab00e19051f335fbc5941dd8eaf3
Author: Karson <[email protected]>
AuthorDate: Wed Oct 30 15:05:20 2024 +0800

    Optimize discoveryManager to achieve code separation of concerns (#4571)
---
 .../foundation/common/NamedThreadFactory.java      |   4 +-
 .../servicecomb/registry/DiscoveryManager.java     | 149 ++++++++++-----------
 2 files changed, 72 insertions(+), 81 deletions(-)

diff --git 
a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
 
b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
index c6be9c1e0..3b79c9c8e 100644
--- 
a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
+++ 
b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java
@@ -43,7 +43,9 @@ public class NamedThreadFactory implements ThreadFactory {
    */
   @Override
   public Thread newThread(Runnable r) {
-    return new Thread(r, prefix + "-" + threadNumber.getAndIncrement());
+    Thread thread =  new Thread(r, prefix + "-" + 
threadNumber.getAndIncrement());
+    thread.setDaemon(true);
+    return thread;
   }
 
   public String getPrefix() {
diff --git 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
index e5c6b8047..256b392de 100644
--- 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
+++ 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.servicecomb.foundation.common.NamedThreadFactory;
 import org.apache.servicecomb.foundation.common.cache.VersionedCache;
 import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
 import org.apache.servicecomb.foundation.common.utils.LambdaUtils;
@@ -57,6 +58,8 @@ public class DiscoveryManager implements LifeCycle {
 
   private final InstancePing ping;
 
+  private final HealthCheckTask healthCheckTask = new HealthCheckTask();
+
   // application:serviceName:instanceId
   private final Map<String, Map<String, Map<String, 
StatefulDiscoveryInstance>>>
       allInstances = new ConcurrentHashMapEx<>();
@@ -76,91 +79,13 @@ public class DiscoveryManager implements LifeCycle {
       discovery.setInstanceChangedListener(this::onInstancesChanged);
     }
     this.ping = pings.get(0);
-    task = Executors.newScheduledThreadPool(1, (runnable) -> {
-      Thread thread = new Thread(runnable, "discovery-manager-task") {
-        @Override
-        public void run() {
-          try {
-            runnable.run();
-          } catch (Throwable e) {
-            // This would never happen, because Worker will catch Throwable 
and mute all tasks.
-            LOGGER.error("discovery manager task error, not allowed please 
fix. ", e);
-          }
-        }
-      };
-      thread.setPriority(Thread.MIN_PRIORITY);
-      thread.setDaemon(true);
-      return thread;
-    });
+    task = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("discovery-manager-task"));
   }
 
   public Discovery<? extends DiscoveryInstance> getPrimaryDiscovery() {
     return this.discoveryList.get(0);
   }
 
-  private void doTask() {
-    // doTask can not throw exception or will mute all tasks.
-    try {
-      doTaskImpl();
-    } catch (Throwable e) {
-      LOGGER.error("discovery manager task error. ", e);
-    }
-  }
-
-  private void doTaskImpl() {
-    Map<String, Map<String, List<String>>> removed = new HashMap<>();
-    for (Entry<String, Map<String, Map<String, StatefulDiscoveryInstance>>> 
apps : allInstances.entrySet()) {
-      for (Entry<String, Map<String, StatefulDiscoveryInstance>> services : 
apps.getValue().entrySet()) {
-        boolean changed = false;
-        for (StatefulDiscoveryInstance instance : 
services.getValue().values()) {
-          // check isolated time
-          if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
-              instance.getIsolatedTime() + instance.getIsolateDuration() < 
System.currentTimeMillis()) {
-            instance.setIsolationStatus(IsolationStatus.NORMAL);
-            changed = true;
-          }
-          // check ping status
-          if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) {
-            boolean pingResult = ping.ping(instance);
-            if (pingResult && instance.getPingStatus() != PingStatus.OK) {
-              instance.setPingStatus(PingStatus.OK);
-              changed = true;
-            } else if (!pingResult && instance.getPingStatus() != 
PingStatus.FAIL) {
-              instance.setPingStatus(PingStatus.FAIL);
-              changed = true;
-            }
-            instance.setPingTime(System.currentTimeMillis());
-          }
-          // check unused
-          if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
-            if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
-                instance.getPingStatus() == PingStatus.FAIL ||
-                instance.getIsolationStatus() == IsolationStatus.ISOLATED) {
-              removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
-                  .computeIfAbsent(services.getKey(), k -> new 
ArrayList<>()).add(instance.getInstanceId());
-              LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
-                  apps.getKey(), services.getKey(), instance.getRegistryName(),
-                  instance.getInstanceId(), instance.getHistoryStatus(),
-                  instance.getStatus(), instance.getPingStatus(), 
instance.getIsolationStatus());
-              changed = true;
-            }
-          }
-        }
-        if (changed) {
-          rebuildVersionCache(apps.getKey(), services.getKey());
-        }
-      }
-    }
-    // remove unused
-    for (Entry<String, Map<String, List<String>>> apps : removed.entrySet()) {
-      for (Entry<String, List<String>> services : apps.getValue().entrySet()) {
-        for (String instance : services.getValue()) {
-          
allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
-        }
-      }
-    }
-  }
-
   private void onInstancesChanged(String application, String serviceName,
       List<? extends DiscoveryInstance> instances) {
     onInstancesChanged(null, application, serviceName, instances);
@@ -326,7 +251,7 @@ public class DiscoveryManager implements LifeCycle {
   @Override
   public void run() {
     discoveryList.forEach(LifeCycle::run);
-    task.scheduleWithFixedDelay(this::doTask, 3, 3, TimeUnit.SECONDS);
+    task.schedule(healthCheckTask, 3, TimeUnit.SECONDS);
   }
 
   @Override
@@ -345,4 +270,68 @@ public class DiscoveryManager implements LifeCycle {
     });
     return result.toString();
   }
+
+  class HealthCheckTask implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        Map<String, Map<String, List<String>>> removed = new HashMap<>();
+        for (Entry<String, Map<String, Map<String, 
StatefulDiscoveryInstance>>> apps : 
DiscoveryManager.this.allInstances.entrySet()) {
+          for (Entry<String, Map<String, StatefulDiscoveryInstance>> services 
: apps.getValue().entrySet()) {
+            boolean changed = false;
+            for (StatefulDiscoveryInstance instance : 
services.getValue().values()) {
+              // check isolated time
+              if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
+                  instance.getIsolatedTime() + instance.getIsolateDuration() < 
System.currentTimeMillis()) {
+                instance.setIsolationStatus(IsolationStatus.NORMAL);
+                changed = true;
+              }
+              // check ping status
+              if (System.currentTimeMillis() - instance.getPingTime() > 
60_000L) {
+                boolean pingResult = DiscoveryManager.this.ping.ping(instance);
+                if (pingResult && instance.getPingStatus() != PingStatus.OK) {
+                  instance.setPingStatus(PingStatus.OK);
+                  changed = true;
+                } else if (!pingResult && instance.getPingStatus() != 
PingStatus.FAIL) {
+                  instance.setPingStatus(PingStatus.FAIL);
+                  changed = true;
+                }
+                instance.setPingTime(System.currentTimeMillis());
+              }
+              // check unused
+              if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
+                if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
+                    instance.getPingStatus() == PingStatus.FAIL ||
+                    instance.getIsolationStatus() == IsolationStatus.ISOLATED) 
{
+                  removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
+                      .computeIfAbsent(services.getKey(), k -> new 
ArrayList<>()).add(instance.getInstanceId());
+                  LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
+                      apps.getKey(), services.getKey(), 
instance.getRegistryName(),
+                      instance.getInstanceId(), instance.getHistoryStatus(),
+                      instance.getStatus(), instance.getPingStatus(), 
instance.getIsolationStatus());
+                  changed = true;
+                }
+              }
+            }
+            if (changed) {
+              rebuildVersionCache(apps.getKey(), services.getKey());
+            }
+          }
+        }
+        // remove unused
+        for (Entry<String, Map<String, List<String>>> apps : 
removed.entrySet()) {
+          for (Entry<String, List<String>> services : 
apps.getValue().entrySet()) {
+            for (String instance : services.getValue()) {
+              
DiscoveryManager.this.allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
+            }
+          }
+        }
+      } catch (Throwable e) {
+        LOGGER.error("discovery manager task error. ", e);
+      } finally {
+        DiscoveryManager.this.task.schedule(this, 3, TimeUnit.SECONDS);
+      }
+    }
+  }
 }

Reply via email to