This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e242253 [#956] refactor: Changes the Boolean flag that determines 
whether a Node is healthy to a state (#959)
0e242253 is described below

commit 0e2422533b9e88209a62ca67714e78e798d85391
Author: yl09099 <[email protected]>
AuthorDate: Thu Jun 22 17:09:44 2023 +0800

    [#956] refactor: Changes the Boolean flag that determines whether a Node is 
healthy to a state (#959)
    
    ### What changes were proposed in this pull request?
    
    Change the ServerNode health status from the original Boolean judgment to 
the unhealthy state
    
    ### Why are the changes needed?
    
    Unhealthy states should not be isolated
    
    Fix: #956
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests
---
 .../coordinator/CoordinatorGrpcService.java        | 11 +++-
 .../org/apache/uniffle/coordinator/ServerNode.java | 18 ++----
 .../uniffle/coordinator/SimpleClusterManager.java  |  4 +-
 .../access/checker/AccessClusterLoadChecker.java   |  4 +-
 .../apache/uniffle/coordinator/ServerNodeTest.java | 14 ++---
 .../coordinator/SimpleClusterManagerTest.java      | 66 +++++++++++-----------
 .../checker/AccessClusterLoadCheckerTest.java      | 18 +++---
 .../checker/AccessQuotaCheckerTest.java            |  6 +-
 .../assignment/BasicAssignmentStrategyTest.java    | 17 +++---
 .../PartitionBalanceAssignmentStrategyTest.java    | 23 ++++----
 .../assignment/PartitionRangeAssignmentTest.java   |  2 +-
 .../ContinuousSelectPartitionStrategyTest.java     |  3 +-
 .../test/HealthCheckCoordinatorGrpcTest.java       |  8 +--
 .../org/apache/uniffle/test}/HealthCheckTest.java  | 31 +++++-----
 .../client/impl/grpc/CoordinatorGrpcClient.java    |  4 --
 .../client/request/RssSendHeartBeatRequest.java    |  7 ---
 .../org/apache/uniffle/server/HealthCheck.java     | 28 ++++++---
 .../apache/uniffle/server/RegisterHeartBeat.java   |  3 -
 .../org/apache/uniffle/server/ShuffleServer.java   | 36 ++++++------
 .../apache/uniffle/server/HealthyMockChecker.java  |  2 +-
 .../uniffle/server/UnHealthyMockChecker.java       |  2 +-
 21 files changed, 150 insertions(+), 157 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e012f03b..1ab782d7 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -364,11 +364,19 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
   }
 
   private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
+    ServerStatus serverStatus = request.hasStatus() ? 
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
     boolean isHealthy = true;
     if (request.hasIsHealthy()) {
       isHealthy = request.getIsHealthy().getValue();
+      /**
+       * Compatible with  older version
+       */
+      if (isHealthy) {
+        serverStatus = ServerStatus.ACTIVE;
+      } else {
+        serverStatus = ServerStatus.UNHEALTHY;
+      }
     }
-    ServerStatus serverStatus = request.hasStatus() ? 
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
     return new ServerNode(request.getServerId().getId(),
         request.getServerId().getIp(),
         request.getServerId().getPort(),
@@ -377,7 +385,6 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         request.getAvailableMemory(),
         request.getEventNumInFlush(),
         Sets.newHashSet(request.getTagsList()),
-        isHealthy,
         serverStatus,
         StorageInfoUtils.fromProto(request.getStorageInfoMap()),
         request.getServerId().getNettyPort());
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index 6cc720ed..cafa8b54 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -50,9 +50,8 @@ public class ServerNode implements Comparable<ServerNode> {
       long preAllocatedMemory,
       long availableMemory,
       int eventNumInFlush,
-      Set<String> tags,
-      boolean isHealthy) {
-    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
+      Set<String> tags) {
+    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags,
         ServerStatus.ACTIVE, Maps.newHashMap());
   }
 
@@ -65,9 +64,8 @@ public class ServerNode implements Comparable<ServerNode> {
       long availableMemory,
       int eventNumInFlush,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus status) {
-    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
+    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags,
         status, Maps.newHashMap());
   }
 
@@ -80,10 +78,9 @@ public class ServerNode implements Comparable<ServerNode> {
       long availableMemory,
       int eventNumInFlush,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus status,
       Map<String, StorageInfo> storageInfoMap) {
-    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
+    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags,
         status, storageInfoMap, -1);
   }
 
@@ -96,7 +93,6 @@ public class ServerNode implements Comparable<ServerNode> {
       long availableMemory,
       int eventNumInFlush,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus status,
       Map<String, StorageInfo> storageInfoMap,
       int nettyPort) {
@@ -109,7 +105,7 @@ public class ServerNode implements Comparable<ServerNode> {
     this.eventNumInFlush = eventNumInFlush;
     this.timestamp = System.currentTimeMillis();
     this.tags = tags;
-    this.status = isHealthy ? status : ServerStatus.UNHEALTHY;
+    this.status = status;
     this.storageInfo = storageInfoMap;
     if (nettyPort > 0) {
       this.nettyPort = nettyPort;
@@ -156,10 +152,6 @@ public class ServerNode implements Comparable<ServerNode> {
   public Set<String> getTags() {
     return tags;
   }
-
-  public boolean isHealthy() {
-    return this.status != ServerStatus.UNHEALTHY;
-  }
   
   public ServerStatus getStatus() {
     return status;
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index a11823ea..6729e8f3 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -135,7 +135,7 @@ public class SimpleClusterManager implements ClusterManager 
{
           sn.setStatus(ServerStatus.LOST);
           lostNodes.add(sn);
           unhealthyNodes.remove(sn);
-        } else if (!sn.isHealthy()) {
+        } else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
           LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
           unhealthyNodes.add(sn);
           lostNodes.remove(sn);
@@ -242,7 +242,7 @@ public class SimpleClusterManager implements ClusterManager 
{
       }
       if (!excludeNodes.contains(node.getId())
           && node.getTags().containsAll(requiredTags)
-          && node.isHealthy()) {
+          && ServerStatus.ACTIVE.equals(node.getStatus())) {
         availableNodes.add(node);
       }
     }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 67eb584d..093709a3 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.config.Reconfigurable;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.Constants;
@@ -67,7 +68,8 @@ public class AccessClusterLoadChecker extends 
AbstractAccessChecker implements R
   public AccessCheckResult check(AccessInfo accessInfo) {
     Set<String> tags = accessInfo.getTags();
     List<ServerNode> servers = clusterManager.getServerList(tags);
-    int size = (int) 
servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count();
+    int size = (int) servers.stream().filter(serverNode -> 
serverNode.getStatus()
+        .equals(ServerStatus.ACTIVE)).filter(this::checkMemory).count();
 
     // If the hard constraint number exist, directly check it
     if (availableServerNumThreshold != -1 && size >= 
availableServerNumThreshold) {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
index f60ff497..7fc4041a 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
@@ -40,11 +40,11 @@ public class ServerNodeTest {
   public void compareTest() {
     Set<String> tags = Sets.newHashSet("test");
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-        10, tags, true);
+        10, tags);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-        10, tags, true);
+        10, tags);
     ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-        11, tags, true);
+        11, tags);
     List<ServerNode> nodes = Lists.newArrayList(sn1, sn2, sn3);
     Collections.sort(nodes);
     assertEquals("sn2", nodes.get(0).getId());
@@ -55,7 +55,7 @@ public class ServerNodeTest {
   @Test
   public void testStorageInfoOfServerNode() {
     Set<String> tags = Sets.newHashSet("tag");
-    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags, 
true);
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags);
     // default constructor creates ServerNode with zero size of LocalStorage
     assertEquals(0, sn1.getStorageInfo().size());
     Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
@@ -67,7 +67,7 @@ public class ServerNodeTest {
         StorageStatus.NORMAL);
     localStorageInfo.put("/mnt", info);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
-        true, ServerStatus.ACTIVE, localStorageInfo);
+        ServerStatus.ACTIVE, localStorageInfo);
     assertEquals(1, sn2.getStorageInfo().size());
   }
 
@@ -76,10 +76,10 @@ public class ServerNodeTest {
     Set<String> tags = Sets.newHashSet("tag");
     Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
     ServerNode sn1 = new ServerNode("sn1", "ip", 1, 100L, 50L, 20, 10, tags,
-        true, ServerStatus.ACTIVE, localStorageInfo);
+        ServerStatus.ACTIVE, localStorageInfo);
     assertEquals(sn1.getNettyPort(), -1);
     ServerNode sn2 = new ServerNode("sn2", "ip", 1, 100L, 50L, 20, 10, tags,
-        true, ServerStatus.ACTIVE, localStorageInfo, 2);
+        ServerStatus.ACTIVE, localStorageInfo, 2);
     assertEquals(sn2.getNettyPort(), 2);
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index e08193f6..1457847f 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -85,11 +85,11 @@ public class SimpleClusterManagerTest {
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
 
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, grpcTags, true);
+          10, grpcTags);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-          10, grpcTags, true);
+          10, grpcTags);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-          11, grpcTags, true);
+          11, grpcTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -100,11 +100,11 @@ public class SimpleClusterManagerTest {
 
       // tag changes
       sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, Sets.newHashSet("new_tag"), true);
+          10, Sets.newHashSet("new_tag"));
       sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-          10, Sets.newHashSet("test", "new_tag"), true);
+          10, Sets.newHashSet("test", "new_tag"));
       ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
-          10, grpcTags, true);
+          10, grpcTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn4);
@@ -137,11 +137,11 @@ public class SimpleClusterManagerTest {
     coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
     SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-        10, grpcTags, true);
+        10, grpcTags);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-        10, grpcTags, true);
+        10, grpcTags);
     ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-        11, grpcTags, true);
+        11, grpcTags);
     clusterManager.add(sn1);
     clusterManager.add(sn2);
     clusterManager.add(sn3);
@@ -154,7 +154,7 @@ public class SimpleClusterManagerTest {
     );
     // re-register sn3
     sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-        11, grpcTags, true);
+        11, grpcTags);
     clusterManager.add(sn3);
     Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
     await().atMost(1, TimeUnit.SECONDS).until(() -> { 
@@ -173,13 +173,13 @@ public class SimpleClusterManagerTest {
     coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
     SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-        10, grpcTags, true);
+        10, grpcTags);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-        10, grpcTags, true);
+        10, grpcTags);
     ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-        11, grpcTags, false);
+        11, grpcTags, ServerStatus.UNHEALTHY);
     ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
-        11, grpcTags, false);
+        11, grpcTags, ServerStatus.UNHEALTHY);
     clusterManager.add(sn1);
     clusterManager.add(sn2);
     clusterManager.add(sn3);
@@ -193,7 +193,7 @@ public class SimpleClusterManagerTest {
     });
     // Register unhealthy node sn3 again
     sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-        11, grpcTags, false);
+        11, grpcTags, ServerStatus.UNHEALTHY);
     clusterManager.add(sn3);
     Set<String> expectedIdsre = Sets.newHashSet("sn3");
     await().atMost(1, TimeUnit.SECONDS).until(() -> { 
@@ -215,13 +215,13 @@ public class SimpleClusterManagerTest {
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
 
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, nettyTags, true, ServerStatus.ACTIVE, 
JavaUtils.newConcurrentMap(), 1);
+          10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-          10, nettyTags, true, ServerStatus.ACTIVE, 
JavaUtils.newConcurrentMap(), 1);
+          10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-          11, nettyTags, true, ServerStatus.ACTIVE, 
JavaUtils.newConcurrentMap(), 1);
+          11, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
       ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
-          11, grpcTags, true);
+          11, grpcTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -241,12 +241,12 @@ public class SimpleClusterManagerTest {
 
       // tag changes
       sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE, 
JavaUtils.newConcurrentMap(), 1);
+          10, Sets.newHashSet("new_tag"), ServerStatus.ACTIVE, 
JavaUtils.newConcurrentMap(), 1);
       sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
           10, Sets.newHashSet("test", "new_tag"),
-          true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
+          ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
       sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
-          10, grpcTags, true);
+          10, grpcTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn4);
@@ -275,11 +275,11 @@ public class SimpleClusterManagerTest {
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, testTags, false);
+          10, testTags, ServerStatus.UNHEALTHY);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-          10, testTags, true);
+          10, testTags);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-          11, testTags, true);
+          11, testTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -303,7 +303,7 @@ public class SimpleClusterManagerTest {
   }
 
   private void addNode(String id, SimpleClusterManager clusterManager) {
-    ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, 
testTags, true);
+    ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, 
testTags);
     LOG.info("Add node " + node.getId() + " " + node.getTimestamp());
     clusterManager.add(node);
   }
@@ -337,11 +337,11 @@ public class SimpleClusterManagerTest {
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
       ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
-          10, testTags, true);
+          10, testTags);
       ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
-          10, testTags, true);
+          10, testTags);
       ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
-          11, testTags, true);
+          11, testTags);
       clusterManager.add(sn1);
       clusterManager.add(sn2);
       clusterManager.add(sn3);
@@ -369,13 +369,13 @@ public class SimpleClusterManagerTest {
 
     try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new 
Configuration())) {
       scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
-          10, testTags, true));
+          10, testTags));
       scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,
-          10, testTags, true));
+          10, testTags));
       scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20,
-          10, testTags, true));
+          10, testTags));
       scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
-          10, testTags, true));
+          10, testTags));
       assertTrue(scm.getExcludeNodes().isEmpty());
 
       final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
index a48e69a4..eeb3b7d0 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.coordinator.AccessManager;
 import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.ClusterManager;
@@ -75,8 +76,7 @@ public class AccessClusterLoadCheckerTest {
         20,
         1000,
         0,
-        null,
-        true);
+        null);
     ServerNode node2 = new ServerNode(
         "1",
         "1",
@@ -85,8 +85,7 @@ public class AccessClusterLoadCheckerTest {
         20,
         1000,
         0,
-        null,
-        true);
+        null);
     nodes.add(node1);
     nodes.add(node2);
 
@@ -155,7 +154,7 @@ public class AccessClusterLoadCheckerTest {
         30,
         0,
         null,
-        false);
+        ServerStatus.UNHEALTHY);
     serverNodeList.add(node1);
     final String filePath = Objects.requireNonNull(
         getClass().getClassLoader().getResource("coordinator.conf")).getFile();
@@ -178,8 +177,7 @@ public class AccessClusterLoadCheckerTest {
         40,
         10,
         0,
-        null,
-        true);
+        null);
     serverNodeList.add(node2);
     ServerNode node3 = new ServerNode(
         "1",
@@ -189,8 +187,7 @@ public class AccessClusterLoadCheckerTest {
         25,
         20,
         0,
-        null,
-        true);
+        null);
     serverNodeList.add(node3);
     assertFalse(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
     ServerNode node4 = new ServerNode(
@@ -201,8 +198,7 @@ public class AccessClusterLoadCheckerTest {
         25,
         25,
         0,
-        null,
-        true);
+        null);
     serverNodeList.add(node4);
     assertTrue(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
   }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
index 79aabb75..4d61a50b 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
@@ -72,8 +72,7 @@ public class AccessQuotaCheckerTest {
         20,
         1000,
         0,
-        null,
-        true);
+        null);
     ServerNode node2 = new ServerNode(
         "1",
         "1",
@@ -82,8 +81,7 @@ public class AccessQuotaCheckerTest {
         20,
         1000,
         0,
-        null,
-        true);
+        null);
     nodes.add(node1);
     nodes.add(node2);
 
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
index 03992edc..4ffdd407 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
@@ -68,7 +68,7 @@ public class BasicAssignmentStrategyTest {
   public void testAssign() {
     for (int i = 0; i < 20; ++i) {
       clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 
0, 0,
-          20 - i, 0, tags, true));
+          20 - i, 0, tags));
     }
 
     PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
@@ -95,7 +95,7 @@ public class BasicAssignmentStrategyTest {
   public void testRandomAssign() {
     for (int i = 0; i < 20; ++i) {
       clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 
0, 0,
-          0, 0, tags, true));
+          0, 0, tags));
     }
     PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
     SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
@@ -118,11 +118,11 @@ public class BasicAssignmentStrategyTest {
   @Test
   public void testAssignWithDifferentNodeNum() {
     final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0,
-        20, 0, tags, true);
+        20, 0, tags);
     final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0,
-        10, 0, tags, true);
+        10, 0, tags);
     final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0,
-        0, 0, tags, true);
+        0, 0, tags);
 
     clusterManager.add(sn1);
     PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
@@ -161,7 +161,7 @@ public class BasicAssignmentStrategyTest {
 
     for (int i = 0; i < 20; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
 
     /**
@@ -231,7 +231,7 @@ public class BasicAssignmentStrategyTest {
     serverTags = Sets.newHashSet("tag-2");
     for (int i = 0; i < shuffleNodesMax - 1; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
     assertEquals(
@@ -283,8 +283,7 @@ public class BasicAssignmentStrategyTest {
           5L,
           resources.get(i),
           5,
-          tags,
-          true);
+          tags);
       clusterManager.add(node);
     }
   }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index f4dcb259..19af678a 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -194,8 +194,7 @@ public class PartitionBalanceAssignmentStrategyTest {
           5L,
           resources.get(i),
           5,
-          tags,
-          true);
+          tags);
       clusterManager.add(node);
     }
   }
@@ -206,7 +205,7 @@ public class PartitionBalanceAssignmentStrategyTest {
 
     for (int i = 0; i < 20; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
 
     /**
@@ -276,7 +275,7 @@ public class PartitionBalanceAssignmentStrategyTest {
     serverTags = Sets.newHashSet("tag-2");
     for (int i = 0; i < shuffleNodesMax - 1; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
     assertEquals(
@@ -304,11 +303,11 @@ public class PartitionBalanceAssignmentStrategyTest {
 
     for (int i = 0; i < 5; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     for (int i = 0; i < 5; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
     pra.getAssignments().values().forEach((nodeList) -> {
@@ -349,11 +348,11 @@ public class PartitionBalanceAssignmentStrategyTest {
 
     for (int i = 0; i < 3; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     for (int i = 0; i < 2; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
     pra.getAssignments().values().forEach((nodeList) -> {
@@ -364,11 +363,11 @@ public class PartitionBalanceAssignmentStrategyTest {
     clusterManager = new SimpleClusterManager(ssc, new Configuration());
     for (int i = 0; i < 3; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     for (int i = 0; i < 2; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
     pra = strategy.assign(100, 1, 3, serverTags, -1, -1);
@@ -394,11 +393,11 @@ public class PartitionBalanceAssignmentStrategyTest {
 
     for (int i = 0; i < 3; ++i) {
       clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     for (int i = 0; i < 2; ++i) {
       clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
-          20 - i, 0, serverTags, true));
+          20 - i, 0, serverTags));
     }
     PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
     pra.getAssignments().values().forEach((nodeList) -> {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
index 4d3dc215..806801dd 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
@@ -40,7 +40,7 @@ public class PartitionRangeAssignmentTest {
     for (int i = 0; i < 9; i = i + 3) {
       PartitionRange range = new PartitionRange(i, i + 2);
       List<ServerNode> nodes = Collections.singletonList(new ServerNode(
-          String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, 
Sets.newHashSet("test"), true));
+          String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, 
Sets.newHashSet("test")));
       sortedMap.put(range, nodes);
     }
 
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
index 26329d7d..92615306 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
@@ -89,8 +89,7 @@ public class ContinuousSelectPartitionStrategyTest {
           5L,
           resources.get(i),
           5,
-          tags,
-          true);
+          tags);
       serverNodes.add(node);
     }
     return serverNodes;
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
index 8338aa92..313a1a2b 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
 import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -41,7 +42,6 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase  {
@@ -113,7 +113,7 @@ public class HealthCheckCoordinatorGrpcTest extends 
CoordinatorTestBase  {
         coordinatorClient.getShuffleAssignments(request);
     assertFalse(response.getPartitionToServers().isEmpty());
     for (ServerNode node : nodes) {
-      assertTrue(node.isHealthy());
+      assertEquals(ServerStatus.ACTIVE, node.getStatus());
     }
     byte[] bytes = new byte[writeDataSize];
     new Random().nextBytes(bytes);
@@ -125,7 +125,7 @@ public class HealthCheckCoordinatorGrpcTest extends 
CoordinatorTestBase  {
     nodes  = coordinators.get(0).getClusterManager().list();
     assertEquals(2, nodes.size());
     for (ServerNode node : nodes) {
-      assertFalse(node.isHealthy());
+      assertEquals(ServerStatus.UNHEALTHY, node.getStatus());
     }
     nodes = 
coordinators.get(0).getClusterManager().getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
     assertEquals(0, nodes.size());
@@ -144,7 +144,7 @@ public class HealthCheckCoordinatorGrpcTest extends 
CoordinatorTestBase  {
       }
     } while (nodes.size() != 2);
     for (ServerNode node : nodes) {
-      assertTrue(node.isHealthy());
+      assertEquals(ServerStatus.ACTIVE, node.getStatus());
     }
     assertEquals(2, nodes.size());
     response =
diff --git 
a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java 
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
similarity index 76%
rename from server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
rename to 
integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
index f1bbd0c6..c0b80013 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
@@ -15,24 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.server;
+package org.apache.uniffle.test;
 
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.server.HealthCheck;
+import org.apache.uniffle.server.HealthyMockChecker;
+import org.apache.uniffle.server.LocalStorageChecker;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.UnHealthyMockChecker;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class HealthCheckTest {
-
+public class HealthCheckTest extends CoordinatorTestBase {
   @BeforeAll
   public static void setup() {
     ShuffleServerMetrics.register();
@@ -73,26 +78,26 @@ public class HealthCheckTest {
   }
 
   @Test
-  public void checkTest() {
-    AtomicBoolean healthy = new AtomicBoolean(false);
+  public void checkTest() throws Exception {
     ShuffleServerConf conf = new ShuffleServerConf();
+    AtomicReference<ServerStatus> serverStatusAtomicReference = new 
AtomicReference(ServerStatus.ACTIVE);
     conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), 
HealthyMockChecker.class.getCanonicalName());
-    HealthCheck checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+    HealthCheck checker = new HealthCheck(serverStatusAtomicReference, conf, 
Lists.newArrayList());
     checker.check();
-    assertTrue(healthy.get());
+    assertEquals(ServerStatus.ACTIVE, checker.getServerStatus());
     assertEquals(0, ShuffleServerMetrics.gaugeIsHealthy.get());
 
     conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), 
UnHealthyMockChecker.class.getCanonicalName());
-    checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+    checker = new HealthCheck(serverStatusAtomicReference, conf, 
Lists.newArrayList());
     checker.check();
-    assertFalse(healthy.get());
+    assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus());
     assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get());
 
     conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
         UnHealthyMockChecker.class.getCanonicalName() + "," + 
HealthyMockChecker.class.getCanonicalName());
-    checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+    checker = new HealthCheck(serverStatusAtomicReference, conf, 
Lists.newArrayList());
     checker.check();
-    assertFalse(healthy.get());
+    assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus());
     assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get());
   }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 4ecaf78f..08625404 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.protobuf.BoolValue;
 import com.google.protobuf.Empty;
 import io.grpc.ManagedChannel;
 import io.grpc.StatusRuntimeException;
@@ -117,7 +116,6 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       int eventNumInFlush,
       long timeout,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo,
       int nettyPort) {
@@ -131,7 +129,6 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .setAvailableMemory(availableMemory)
             .setEventNumInFlush(eventNumInFlush)
             .addAllTags(tags)
-            .setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build())
             .setStatusValue(serverStatus.ordinal())
             .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
             .build();
@@ -197,7 +194,6 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
         request.getEventNumInFlush(),
         request.getTimeout(),
         request.getTags(),
-        request.isHealthy(),
         request.getServerStatus(),
         request.getStorageInfo(),
         request.getNettyPort());
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index afa5d0ed..a9d21377 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -35,7 +35,6 @@ public class RssSendHeartBeatRequest {
   private final int eventNumInFlush;
   private final Set<String> tags;
   private final long timeout;
-  private final boolean isHealthy;
   private final ServerStatus serverStatus;
   private final Map<String, StorageInfo> storageInfo;
   private final int nettyPort;
@@ -50,7 +49,6 @@ public class RssSendHeartBeatRequest {
       int eventNumInFlush,
       long timeout,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo,
       int nettyPort) {
@@ -63,7 +61,6 @@ public class RssSendHeartBeatRequest {
     this.eventNumInFlush = eventNumInFlush;
     this.tags = tags;
     this.timeout = timeout;
-    this.isHealthy = isHealthy;
     this.serverStatus = serverStatus;
     this.storageInfo = storageInfo;
     this.nettyPort = nettyPort;
@@ -105,10 +102,6 @@ public class RssSendHeartBeatRequest {
     return tags;
   }
 
-  public boolean isHealthy() {
-    return isHealthy;
-  }
-
   public ServerStatus getServerStatus() {
     return serverStatus;
   }
diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java 
b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
index ecfa28b1..27a30851 100644
--- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
+++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
@@ -20,15 +20,16 @@ package org.apache.uniffle.server;
 import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ServerStatus;
+
 /**
  * HealthCheck will check every server whether it has the ability to process 
shuffle data. Currently, we only support
  * disk checker. If enough disks don't have enough disk space, server will 
become unhealthy, and only enough disks
@@ -38,14 +39,17 @@ public class HealthCheck {
 
   private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class);
 
-  private final AtomicBoolean isHealthy;
+  private AtomicReference<ServerStatus> serverStatus;
   private final long checkIntervalMs;
   private final Thread thread;
   private volatile boolean isStop = false;
   private List<Checker> checkers = Lists.newArrayList();
 
-  public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf, 
List<Checker> buildInCheckers) {
-    this.isHealthy = isHealthy;
+  public HealthCheck(
+      AtomicReference<ServerStatus> serverStatus,
+      ShuffleServerConf conf,
+      List<Checker> buildInCheckers) {
+    this.serverStatus = serverStatus;
     this.checkIntervalMs = 
conf.getLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL);
     List<String> configuredCheckers = 
conf.get(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
     if (CollectionUtils.isEmpty(configuredCheckers) && 
buildInCheckers.isEmpty()) {
@@ -78,17 +82,23 @@ public class HealthCheck {
     thread.setDaemon(true);
   }
 
-  @VisibleForTesting
-  void check() {
+
+  public void check() {
     for (Checker checker : checkers) {
       if (!checker.checkIsHealthy()) {
-        isHealthy.set(false);
+        serverStatus.set(ServerStatus.UNHEALTHY);
         ShuffleServerMetrics.gaugeIsHealthy.set(1);
         return;
       }
     }
     ShuffleServerMetrics.gaugeIsHealthy.set(0);
-    isHealthy.set(true);
+    if (serverStatus.get() == ServerStatus.UNHEALTHY) {
+      serverStatus.set(ServerStatus.ACTIVE);
+    }
+  }
+
+  public ServerStatus getServerStatus() {
+    return serverStatus.get();
   }
 
   public void start() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 893b9c55..56b3799d 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -79,7 +79,6 @@ public class RegisterHeartBeat {
             shuffleServer.getAvailableMemory(),
             shuffleServer.getEventNumInFlush(),
             shuffleServer.getTags(),
-            shuffleServer.isHealthy(),
             shuffleServer.getServerStatus(),
             shuffleServer.getStorageManager().getStorageInfo(),
             shuffleServer.getNettyPort());
@@ -100,7 +99,6 @@ public class RegisterHeartBeat {
       long availableMemory,
       int eventNumInFlush,
       Set<String> tags,
-      boolean isHealthy,
       ServerStatus serverStatus,
       Map<String, StorageInfo> localStorageInfo,
       int nettyPort) {
@@ -116,7 +114,6 @@ public class RegisterHeartBeat {
         eventNumInFlush,
         heartBeatInterval,
         tags,
-        isHealthy,
         serverStatus,
         localStorageInfo,
         nettyPort);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 212dd2d3..13a44009 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -87,10 +87,10 @@ public class ShuffleServer {
   private StorageManager storageManager;
   private HealthCheck healthCheck;
   private Set<String> tags = Sets.newHashSet();
-  private AtomicBoolean isHealthy = new AtomicBoolean(true);
   private GRPCMetrics grpcMetrics;
   private MetricReporter metricReporter;
-  private volatile ServerStatus serverStatus = ServerStatus.ACTIVE;
+
+  private AtomicReference<ServerStatus> serverStatus = new 
AtomicReference(ServerStatus.ACTIVE);
   private volatile boolean running;
   private ExecutorService executorService;
   private Future<?> decommissionFuture;
@@ -226,7 +226,7 @@ public class ShuffleServer {
     if (healthCheckEnable) {
       List<Checker> builtInCheckers = Lists.newArrayList();
       builtInCheckers.add(storageManager.getStorageChecker());
-      healthCheck = new HealthCheck(isHealthy, shuffleServerConf, 
builtInCheckers);
+      healthCheck = new HealthCheck(serverStatus, shuffleServerConf, 
builtInCheckers);
       healthCheck.start();
     }
 
@@ -315,7 +315,11 @@ public class ShuffleServer {
   }
 
   public ServerStatus getServerStatus() {
-    return serverStatus;
+    return serverStatus.get();
+  }
+
+  public void setServerStatus(ServerStatus serverStatus) {
+    this.serverStatus.set(serverStatus);
   }
 
   public synchronized void decommission() {
@@ -323,11 +327,11 @@ public class ShuffleServer {
       LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
       return;
     }
-    if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+    if (!ServerStatus.ACTIVE.equals(serverStatus.get())) {
       throw new InvalidRequestException(
           "Shuffle Server is processing other procedures, current status:" + 
serverStatus);
     }
-    serverStatus = ServerStatus.DECOMMISSIONING;
+    serverStatus.set(ServerStatus.DECOMMISSIONING);
     LOG.info("Shuffle Server is decommissioning.");
     if (executorService == null) {
       executorService = 
ThreadUtils.getDaemonSingleThreadExecutor("shuffle-server-decommission");
@@ -342,7 +346,7 @@ public class ShuffleServer {
     while (isDecommissioning()) {
       remainApplicationNum = shuffleTaskManager.getAppIds().size();
       if (remainApplicationNum == 0) {
-        serverStatus = ServerStatus.DECOMMISSIONED;
+        serverStatus.set(ServerStatus.DECOMMISSIONED);
         LOG.info("All applications finished. Current status is " + 
serverStatus);
         if (shutdownAfterDecommission) {
           LOG.info("Exiting...");
@@ -374,11 +378,11 @@ public class ShuffleServer {
       LOG.info("Shuffle server is not decommissioning. Nothing needs to be 
done.");
       return;
     }
-    if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
-      serverStatus = ServerStatus.ACTIVE;
+    if (ServerStatus.DECOMMISSIONED.equals(serverStatus.get())) {
+      serverStatus.set(ServerStatus.ACTIVE);
       return;
     }
-    serverStatus = ServerStatus.ACTIVE;
+    serverStatus.set(ServerStatus.ACTIVE);
     if (decommissionFuture.cancel(true)) {
       LOG.info("Decommission canceled.");
     } else {
@@ -453,13 +457,9 @@ public class ShuffleServer {
     return Collections.unmodifiableSet(tags);
   }
 
-  public boolean isHealthy() {
-    return isHealthy.get();
-  }
-
   @VisibleForTesting
   public void markUnhealthy() {
-    isHealthy.set(false);
+    serverStatus.set(ServerStatus.UNHEALTHY);
   }
 
   public GRPCMetrics getGrpcMetrics() {
@@ -467,8 +467,8 @@ public class ShuffleServer {
   }
 
   public boolean isDecommissioning() {
-    return ServerStatus.DECOMMISSIONING.equals(serverStatus)
-        || ServerStatus.DECOMMISSIONED.equals(serverStatus);
+    return ServerStatus.DECOMMISSIONING.equals(serverStatus.get())
+        || ServerStatus.DECOMMISSIONED.equals(serverStatus.get());
   }
 
   @VisibleForTesting
diff --git 
a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java 
b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
index 18b16bb8..1fce9cd3 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.server;
 
-class HealthyMockChecker extends Checker {
+public class HealthyMockChecker extends Checker {
 
   @SuppressWarnings("checkstyle:RedundantModifier")
   public HealthyMockChecker(ShuffleServerConf conf) {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java 
b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
index 1629335e..18e673fb 100644
--- a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
@@ -17,7 +17,7 @@
 
 package org.apache.uniffle.server;
 
-class UnHealthyMockChecker extends Checker {
+public class UnHealthyMockChecker extends Checker {
 
   @SuppressWarnings("checkstyle:RedundantModifier")
   public UnHealthyMockChecker(ShuffleServerConf conf) {

Reply via email to