This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0e242253 [#956] refactor: Changes the Boolean flag that determines
whether a Node is healthy to a state (#959)
0e242253 is described below
commit 0e2422533b9e88209a62ca67714e78e798d85391
Author: yl09099 <[email protected]>
AuthorDate: Thu Jun 22 17:09:44 2023 +0800
[#956] refactor: Changes the Boolean flag that determines whether a Node is
healthy to a state (#959)
### What changes were proposed in this pull request?
Change the ServerNode health status from the original Boolean judgment to
the unhealthy state
### Why are the changes needed?
Unhealthy states should not be isolated
Fix: #956
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests
---
.../coordinator/CoordinatorGrpcService.java | 11 +++-
.../org/apache/uniffle/coordinator/ServerNode.java | 18 ++----
.../uniffle/coordinator/SimpleClusterManager.java | 4 +-
.../access/checker/AccessClusterLoadChecker.java | 4 +-
.../apache/uniffle/coordinator/ServerNodeTest.java | 14 ++---
.../coordinator/SimpleClusterManagerTest.java | 66 +++++++++++-----------
.../checker/AccessClusterLoadCheckerTest.java | 18 +++---
.../checker/AccessQuotaCheckerTest.java | 6 +-
.../assignment/BasicAssignmentStrategyTest.java | 17 +++---
.../PartitionBalanceAssignmentStrategyTest.java | 23 ++++----
.../assignment/PartitionRangeAssignmentTest.java | 2 +-
.../ContinuousSelectPartitionStrategyTest.java | 3 +-
.../test/HealthCheckCoordinatorGrpcTest.java | 8 +--
.../org/apache/uniffle/test}/HealthCheckTest.java | 31 +++++-----
.../client/impl/grpc/CoordinatorGrpcClient.java | 4 --
.../client/request/RssSendHeartBeatRequest.java | 7 ---
.../org/apache/uniffle/server/HealthCheck.java | 28 ++++++---
.../apache/uniffle/server/RegisterHeartBeat.java | 3 -
.../org/apache/uniffle/server/ShuffleServer.java | 36 ++++++------
.../apache/uniffle/server/HealthyMockChecker.java | 2 +-
.../uniffle/server/UnHealthyMockChecker.java | 2 +-
21 files changed, 150 insertions(+), 157 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index e012f03b..1ab782d7 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -364,11 +364,19 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
}
private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
+ ServerStatus serverStatus = request.hasStatus() ?
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
boolean isHealthy = true;
if (request.hasIsHealthy()) {
isHealthy = request.getIsHealthy().getValue();
+ /**
+ * Compatible with older version
+ */
+ if (isHealthy) {
+ serverStatus = ServerStatus.ACTIVE;
+ } else {
+ serverStatus = ServerStatus.UNHEALTHY;
+ }
}
- ServerStatus serverStatus = request.hasStatus() ?
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
return new ServerNode(request.getServerId().getId(),
request.getServerId().getIp(),
request.getServerId().getPort(),
@@ -377,7 +385,6 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
request.getAvailableMemory(),
request.getEventNumInFlush(),
Sets.newHashSet(request.getTagsList()),
- isHealthy,
serverStatus,
StorageInfoUtils.fromProto(request.getStorageInfoMap()),
request.getServerId().getNettyPort());
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index 6cc720ed..cafa8b54 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -50,9 +50,8 @@ public class ServerNode implements Comparable<ServerNode> {
long preAllocatedMemory,
long availableMemory,
int eventNumInFlush,
- Set<String> tags,
- boolean isHealthy) {
- this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags, isHealthy,
+ Set<String> tags) {
+ this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags,
ServerStatus.ACTIVE, Maps.newHashMap());
}
@@ -65,9 +64,8 @@ public class ServerNode implements Comparable<ServerNode> {
long availableMemory,
int eventNumInFlush,
Set<String> tags,
- boolean isHealthy,
ServerStatus status) {
- this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags, isHealthy,
+ this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags,
status, Maps.newHashMap());
}
@@ -80,10 +78,9 @@ public class ServerNode implements Comparable<ServerNode> {
long availableMemory,
int eventNumInFlush,
Set<String> tags,
- boolean isHealthy,
ServerStatus status,
Map<String, StorageInfo> storageInfoMap) {
- this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags, isHealthy,
+ this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags,
status, storageInfoMap, -1);
}
@@ -96,7 +93,6 @@ public class ServerNode implements Comparable<ServerNode> {
long availableMemory,
int eventNumInFlush,
Set<String> tags,
- boolean isHealthy,
ServerStatus status,
Map<String, StorageInfo> storageInfoMap,
int nettyPort) {
@@ -109,7 +105,7 @@ public class ServerNode implements Comparable<ServerNode> {
this.eventNumInFlush = eventNumInFlush;
this.timestamp = System.currentTimeMillis();
this.tags = tags;
- this.status = isHealthy ? status : ServerStatus.UNHEALTHY;
+ this.status = status;
this.storageInfo = storageInfoMap;
if (nettyPort > 0) {
this.nettyPort = nettyPort;
@@ -156,10 +152,6 @@ public class ServerNode implements Comparable<ServerNode> {
public Set<String> getTags() {
return tags;
}
-
- public boolean isHealthy() {
- return this.status != ServerStatus.UNHEALTHY;
- }
public ServerStatus getStatus() {
return status;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index a11823ea..6729e8f3 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -135,7 +135,7 @@ public class SimpleClusterManager implements ClusterManager
{
sn.setStatus(ServerStatus.LOST);
lostNodes.add(sn);
unhealthyNodes.remove(sn);
- } else if (!sn.isHealthy()) {
+ } else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
unhealthyNodes.add(sn);
lostNodes.remove(sn);
@@ -242,7 +242,7 @@ public class SimpleClusterManager implements ClusterManager
{
}
if (!excludeNodes.contains(node.getId())
&& node.getTags().containsAll(requiredTags)
- && node.isHealthy()) {
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
availableNodes.add(node);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 67eb584d..093709a3 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
@@ -67,7 +68,8 @@ public class AccessClusterLoadChecker extends
AbstractAccessChecker implements R
public AccessCheckResult check(AccessInfo accessInfo) {
Set<String> tags = accessInfo.getTags();
List<ServerNode> servers = clusterManager.getServerList(tags);
- int size = (int)
servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count();
+ int size = (int) servers.stream().filter(serverNode ->
serverNode.getStatus()
+ .equals(ServerStatus.ACTIVE)).filter(this::checkMemory).count();
// If the hard constraint number exist, directly check it
if (availableServerNumThreshold != -1 && size >=
availableServerNumThreshold) {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
index f60ff497..7fc4041a 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
@@ -40,11 +40,11 @@ public class ServerNodeTest {
public void compareTest() {
Set<String> tags = Sets.newHashSet("test");
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, tags, true);
+ 10, tags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, tags, true);
+ 10, tags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, tags, true);
+ 11, tags);
List<ServerNode> nodes = Lists.newArrayList(sn1, sn2, sn3);
Collections.sort(nodes);
assertEquals("sn2", nodes.get(0).getId());
@@ -55,7 +55,7 @@ public class ServerNodeTest {
@Test
public void testStorageInfoOfServerNode() {
Set<String> tags = Sets.newHashSet("tag");
- ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags,
true);
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags);
// default constructor creates ServerNode with zero size of LocalStorage
assertEquals(0, sn1.getStorageInfo().size());
Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
@@ -67,7 +67,7 @@ public class ServerNodeTest {
StorageStatus.NORMAL);
localStorageInfo.put("/mnt", info);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
- true, ServerStatus.ACTIVE, localStorageInfo);
+ ServerStatus.ACTIVE, localStorageInfo);
assertEquals(1, sn2.getStorageInfo().size());
}
@@ -76,10 +76,10 @@ public class ServerNodeTest {
Set<String> tags = Sets.newHashSet("tag");
Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
ServerNode sn1 = new ServerNode("sn1", "ip", 1, 100L, 50L, 20, 10, tags,
- true, ServerStatus.ACTIVE, localStorageInfo);
+ ServerStatus.ACTIVE, localStorageInfo);
assertEquals(sn1.getNettyPort(), -1);
ServerNode sn2 = new ServerNode("sn2", "ip", 1, 100L, 50L, 20, 10, tags,
- true, ServerStatus.ACTIVE, localStorageInfo, 2);
+ ServerStatus.ACTIVE, localStorageInfo, 2);
assertEquals(sn2.getNettyPort(), 2);
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index e08193f6..1457847f 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -85,11 +85,11 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, true);
+ 11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -100,11 +100,11 @@ public class SimpleClusterManagerTest {
// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, Sets.newHashSet("new_tag"), true);
+ 10, Sets.newHashSet("new_tag"));
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, Sets.newHashSet("test", "new_tag"), true);
+ 10, Sets.newHashSet("test", "new_tag"));
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
- 10, grpcTags, true);
+ 10, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn4);
@@ -137,11 +137,11 @@ public class SimpleClusterManagerTest {
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, true);
+ 11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -154,7 +154,7 @@ public class SimpleClusterManagerTest {
);
// re-register sn3
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, true);
+ 11, grpcTags);
clusterManager.add(sn3);
Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
await().atMost(1, TimeUnit.SECONDS).until(() -> {
@@ -173,13 +173,13 @@ public class SimpleClusterManagerTest {
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, grpcTags, true);
+ 10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, false);
+ 11, grpcTags, ServerStatus.UNHEALTHY);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, false);
+ 11, grpcTags, ServerStatus.UNHEALTHY);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -193,7 +193,7 @@ public class SimpleClusterManagerTest {
});
// Register unhealthy node sn3 again
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, false);
+ 11, grpcTags, ServerStatus.UNHEALTHY);
clusterManager.add(sn3);
Set<String> expectedIdsre = Sets.newHashSet("sn3");
await().atMost(1, TimeUnit.SECONDS).until(() -> {
@@ -215,13 +215,13 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
+ 10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
+ 10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, nettyTags, true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
+ 11, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
- 11, grpcTags, true);
+ 11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -241,12 +241,12 @@ public class SimpleClusterManagerTest {
// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
+ 10, Sets.newHashSet("new_tag"), ServerStatus.ACTIVE,
JavaUtils.newConcurrentMap(), 1);
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, Sets.newHashSet("test", "new_tag"),
- true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
+ ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
- 10, grpcTags, true);
+ 10, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn4);
@@ -275,11 +275,11 @@ public class SimpleClusterManagerTest {
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, testTags, false);
+ 10, testTags, ServerStatus.UNHEALTHY);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, testTags, true);
+ 10, testTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, testTags, true);
+ 11, testTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -303,7 +303,7 @@ public class SimpleClusterManagerTest {
}
private void addNode(String id, SimpleClusterManager clusterManager) {
- ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10,
testTags, true);
+ ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10,
testTags);
LOG.info("Add node " + node.getId() + " " + node.getTimestamp());
clusterManager.add(node);
}
@@ -337,11 +337,11 @@ public class SimpleClusterManagerTest {
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
- 10, testTags, true);
+ 10, testTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
- 10, testTags, true);
+ 10, testTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
- 11, testTags, true);
+ 11, testTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
@@ -369,13 +369,13 @@ public class SimpleClusterManagerTest {
try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new
Configuration())) {
scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags));
scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags));
scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags));
scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
- 10, testTags, true));
+ 10, testTags));
assertTrue(scm.getExcludeNodes().isEmpty());
final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
index a48e69a4..eeb3b7d0 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
@@ -30,6 +30,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.ClusterManager;
@@ -75,8 +76,7 @@ public class AccessClusterLoadCheckerTest {
20,
1000,
0,
- null,
- true);
+ null);
ServerNode node2 = new ServerNode(
"1",
"1",
@@ -85,8 +85,7 @@ public class AccessClusterLoadCheckerTest {
20,
1000,
0,
- null,
- true);
+ null);
nodes.add(node1);
nodes.add(node2);
@@ -155,7 +154,7 @@ public class AccessClusterLoadCheckerTest {
30,
0,
null,
- false);
+ ServerStatus.UNHEALTHY);
serverNodeList.add(node1);
final String filePath = Objects.requireNonNull(
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
@@ -178,8 +177,7 @@ public class AccessClusterLoadCheckerTest {
40,
10,
0,
- null,
- true);
+ null);
serverNodeList.add(node2);
ServerNode node3 = new ServerNode(
"1",
@@ -189,8 +187,7 @@ public class AccessClusterLoadCheckerTest {
25,
20,
0,
- null,
- true);
+ null);
serverNodeList.add(node3);
assertFalse(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
ServerNode node4 = new ServerNode(
@@ -201,8 +198,7 @@ public class AccessClusterLoadCheckerTest {
25,
25,
0,
- null,
- true);
+ null);
serverNodeList.add(node4);
assertTrue(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
index 79aabb75..4d61a50b 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
@@ -72,8 +72,7 @@ public class AccessQuotaCheckerTest {
20,
1000,
0,
- null,
- true);
+ null);
ServerNode node2 = new ServerNode(
"1",
"1",
@@ -82,8 +81,7 @@ public class AccessQuotaCheckerTest {
20,
1000,
0,
- null,
- true);
+ null);
nodes.add(node1);
nodes.add(node2);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
index 03992edc..4ffdd407 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
@@ -68,7 +68,7 @@ public class BasicAssignmentStrategyTest {
public void testAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0,
0, 0,
- 20 - i, 0, tags, true));
+ 20 - i, 0, tags));
}
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
@@ -95,7 +95,7 @@ public class BasicAssignmentStrategyTest {
public void testRandomAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0,
0, 0,
- 0, 0, tags, true));
+ 0, 0, tags));
}
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
@@ -118,11 +118,11 @@ public class BasicAssignmentStrategyTest {
@Test
public void testAssignWithDifferentNodeNum() {
final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0,
- 20, 0, tags, true);
+ 20, 0, tags);
final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0,
- 10, 0, tags, true);
+ 10, 0, tags);
final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0,
- 0, 0, tags, true);
+ 0, 0, tags);
clusterManager.add(sn1);
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
@@ -161,7 +161,7 @@ public class BasicAssignmentStrategyTest {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
/**
@@ -231,7 +231,7 @@ public class BasicAssignmentStrategyTest {
serverTags = Sets.newHashSet("tag-2");
for (int i = 0; i < shuffleNodesMax - 1; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
assertEquals(
@@ -283,8 +283,7 @@ public class BasicAssignmentStrategyTest {
5L,
resources.get(i),
5,
- tags,
- true);
+ tags);
clusterManager.add(node);
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index f4dcb259..19af678a 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -194,8 +194,7 @@ public class PartitionBalanceAssignmentStrategyTest {
5L,
resources.get(i),
5,
- tags,
- true);
+ tags);
clusterManager.add(node);
}
}
@@ -206,7 +205,7 @@ public class PartitionBalanceAssignmentStrategyTest {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
/**
@@ -276,7 +275,7 @@ public class PartitionBalanceAssignmentStrategyTest {
serverTags = Sets.newHashSet("tag-2");
for (int i = 0; i < shuffleNodesMax - 1; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
assertEquals(
@@ -304,11 +303,11 @@ public class PartitionBalanceAssignmentStrategyTest {
for (int i = 0; i < 5; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
for (int i = 0; i < 5; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
pra.getAssignments().values().forEach((nodeList) -> {
@@ -349,11 +348,11 @@ public class PartitionBalanceAssignmentStrategyTest {
for (int i = 0; i < 3; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
for (int i = 0; i < 2; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
pra.getAssignments().values().forEach((nodeList) -> {
@@ -364,11 +363,11 @@ public class PartitionBalanceAssignmentStrategyTest {
clusterManager = new SimpleClusterManager(ssc, new Configuration());
for (int i = 0; i < 3; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
for (int i = 0; i < 2; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
pra = strategy.assign(100, 1, 3, serverTags, -1, -1);
@@ -394,11 +393,11 @@ public class PartitionBalanceAssignmentStrategyTest {
for (int i = 0; i < 3; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
for (int i = 0; i < 2; ++i) {
clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0,
- 20 - i, 0, serverTags, true));
+ 20 - i, 0, serverTags));
}
PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
pra.getAssignments().values().forEach((nodeList) -> {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
index 4d3dc215..806801dd 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
@@ -40,7 +40,7 @@ public class PartitionRangeAssignmentTest {
for (int i = 0; i < 9; i = i + 3) {
PartitionRange range = new PartitionRange(i, i + 2);
List<ServerNode> nodes = Collections.singletonList(new ServerNode(
- String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0,
Sets.newHashSet("test"), true));
+ String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0,
Sets.newHashSet("test")));
sortedMap.put(range, nodes);
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
index 26329d7d..92615306 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
@@ -89,8 +89,7 @@ public class ContinuousSelectPartitionStrategyTest {
5L,
resources.get(i),
5,
- tags,
- true);
+ tags);
serverNodes.add(node);
}
return serverNodes;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
index 8338aa92..313a1a2b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -41,7 +42,6 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
@@ -113,7 +113,7 @@ public class HealthCheckCoordinatorGrpcTest extends
CoordinatorTestBase {
coordinatorClient.getShuffleAssignments(request);
assertFalse(response.getPartitionToServers().isEmpty());
for (ServerNode node : nodes) {
- assertTrue(node.isHealthy());
+ assertEquals(ServerStatus.ACTIVE, node.getStatus());
}
byte[] bytes = new byte[writeDataSize];
new Random().nextBytes(bytes);
@@ -125,7 +125,7 @@ public class HealthCheckCoordinatorGrpcTest extends
CoordinatorTestBase {
nodes = coordinators.get(0).getClusterManager().list();
assertEquals(2, nodes.size());
for (ServerNode node : nodes) {
- assertFalse(node.isHealthy());
+ assertEquals(ServerStatus.UNHEALTHY, node.getStatus());
}
nodes =
coordinators.get(0).getClusterManager().getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION));
assertEquals(0, nodes.size());
@@ -144,7 +144,7 @@ public class HealthCheckCoordinatorGrpcTest extends
CoordinatorTestBase {
}
} while (nodes.size() != 2);
for (ServerNode node : nodes) {
- assertTrue(node.isHealthy());
+ assertEquals(ServerStatus.ACTIVE, node.getStatus());
}
assertEquals(2, nodes.size());
response =
diff --git
a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
similarity index 76%
rename from server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
rename to
integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
index f1bbd0c6..c0b80013 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java
@@ -15,24 +15,29 @@
* limitations under the License.
*/
-package org.apache.uniffle.server;
+package org.apache.uniffle.test;
import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.server.HealthCheck;
+import org.apache.uniffle.server.HealthyMockChecker;
+import org.apache.uniffle.server.LocalStorageChecker;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.UnHealthyMockChecker;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class HealthCheckTest {
-
+public class HealthCheckTest extends CoordinatorTestBase {
@BeforeAll
public static void setup() {
ShuffleServerMetrics.register();
@@ -73,26 +78,26 @@ public class HealthCheckTest {
}
@Test
- public void checkTest() {
- AtomicBoolean healthy = new AtomicBoolean(false);
+ public void checkTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
+ AtomicReference<ServerStatus> serverStatusAtomicReference = new
AtomicReference(ServerStatus.ACTIVE);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
HealthyMockChecker.class.getCanonicalName());
- HealthCheck checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+ HealthCheck checker = new HealthCheck(serverStatusAtomicReference, conf,
Lists.newArrayList());
checker.check();
- assertTrue(healthy.get());
+ assertEquals(ServerStatus.ACTIVE, checker.getServerStatus());
assertEquals(0, ShuffleServerMetrics.gaugeIsHealthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
UnHealthyMockChecker.class.getCanonicalName());
- checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+ checker = new HealthCheck(serverStatusAtomicReference, conf,
Lists.newArrayList());
checker.check();
- assertFalse(healthy.get());
+ assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus());
assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get());
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
UnHealthyMockChecker.class.getCanonicalName() + "," +
HealthyMockChecker.class.getCanonicalName());
- checker = new HealthCheck(healthy, conf, Lists.newArrayList());
+ checker = new HealthCheck(serverStatusAtomicReference, conf,
Lists.newArrayList());
checker.check();
- assertFalse(healthy.get());
+ assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus());
assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get());
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 4ecaf78f..08625404 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.protobuf.BoolValue;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
@@ -117,7 +116,6 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
int eventNumInFlush,
long timeout,
Set<String> tags,
- boolean isHealthy,
ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo,
int nettyPort) {
@@ -131,7 +129,6 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.setAvailableMemory(availableMemory)
.setEventNumInFlush(eventNumInFlush)
.addAllTags(tags)
- .setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build())
.setStatusValue(serverStatus.ordinal())
.putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
.build();
@@ -197,7 +194,6 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getEventNumInFlush(),
request.getTimeout(),
request.getTags(),
- request.isHealthy(),
request.getServerStatus(),
request.getStorageInfo(),
request.getNettyPort());
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index afa5d0ed..a9d21377 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -35,7 +35,6 @@ public class RssSendHeartBeatRequest {
private final int eventNumInFlush;
private final Set<String> tags;
private final long timeout;
- private final boolean isHealthy;
private final ServerStatus serverStatus;
private final Map<String, StorageInfo> storageInfo;
private final int nettyPort;
@@ -50,7 +49,6 @@ public class RssSendHeartBeatRequest {
int eventNumInFlush,
long timeout,
Set<String> tags,
- boolean isHealthy,
ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo,
int nettyPort) {
@@ -63,7 +61,6 @@ public class RssSendHeartBeatRequest {
this.eventNumInFlush = eventNumInFlush;
this.tags = tags;
this.timeout = timeout;
- this.isHealthy = isHealthy;
this.serverStatus = serverStatus;
this.storageInfo = storageInfo;
this.nettyPort = nettyPort;
@@ -105,10 +102,6 @@ public class RssSendHeartBeatRequest {
return tags;
}
- public boolean isHealthy() {
- return isHealthy;
- }
-
public ServerStatus getServerStatus() {
return serverStatus;
}
diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
index ecfa28b1..27a30851 100644
--- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
+++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java
@@ -20,15 +20,16 @@ package org.apache.uniffle.server;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ServerStatus;
+
/**
* HealthCheck will check every server whether it has the ability to process
shuffle data. Currently, we only support
* disk checker. If enough disks don't have enough disk space, server will
become unhealthy, and only enough disks
@@ -38,14 +39,17 @@ public class HealthCheck {
private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class);
- private final AtomicBoolean isHealthy;
+ private AtomicReference<ServerStatus> serverStatus;
private final long checkIntervalMs;
private final Thread thread;
private volatile boolean isStop = false;
private List<Checker> checkers = Lists.newArrayList();
- public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf,
List<Checker> buildInCheckers) {
- this.isHealthy = isHealthy;
+ public HealthCheck(
+ AtomicReference<ServerStatus> serverStatus,
+ ShuffleServerConf conf,
+ List<Checker> buildInCheckers) {
+ this.serverStatus = serverStatus;
this.checkIntervalMs =
conf.getLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL);
List<String> configuredCheckers =
conf.get(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES);
if (CollectionUtils.isEmpty(configuredCheckers) &&
buildInCheckers.isEmpty()) {
@@ -78,17 +82,23 @@ public class HealthCheck {
thread.setDaemon(true);
}
- @VisibleForTesting
- void check() {
+
+ public void check() {
for (Checker checker : checkers) {
if (!checker.checkIsHealthy()) {
- isHealthy.set(false);
+ serverStatus.set(ServerStatus.UNHEALTHY);
ShuffleServerMetrics.gaugeIsHealthy.set(1);
return;
}
}
ShuffleServerMetrics.gaugeIsHealthy.set(0);
- isHealthy.set(true);
+ if (serverStatus.get() == ServerStatus.UNHEALTHY) {
+ serverStatus.set(ServerStatus.ACTIVE);
+ }
+ }
+
+ public ServerStatus getServerStatus() {
+ return serverStatus.get();
}
public void start() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 893b9c55..56b3799d 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -79,7 +79,6 @@ public class RegisterHeartBeat {
shuffleServer.getAvailableMemory(),
shuffleServer.getEventNumInFlush(),
shuffleServer.getTags(),
- shuffleServer.isHealthy(),
shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort());
@@ -100,7 +99,6 @@ public class RegisterHeartBeat {
long availableMemory,
int eventNumInFlush,
Set<String> tags,
- boolean isHealthy,
ServerStatus serverStatus,
Map<String, StorageInfo> localStorageInfo,
int nettyPort) {
@@ -116,7 +114,6 @@ public class RegisterHeartBeat {
eventNumInFlush,
heartBeatInterval,
tags,
- isHealthy,
serverStatus,
localStorageInfo,
nettyPort);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 212dd2d3..13a44009 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -87,10 +87,10 @@ public class ShuffleServer {
private StorageManager storageManager;
private HealthCheck healthCheck;
private Set<String> tags = Sets.newHashSet();
- private AtomicBoolean isHealthy = new AtomicBoolean(true);
private GRPCMetrics grpcMetrics;
private MetricReporter metricReporter;
- private volatile ServerStatus serverStatus = ServerStatus.ACTIVE;
+
+ private AtomicReference<ServerStatus> serverStatus = new
AtomicReference(ServerStatus.ACTIVE);
private volatile boolean running;
private ExecutorService executorService;
private Future<?> decommissionFuture;
@@ -226,7 +226,7 @@ public class ShuffleServer {
if (healthCheckEnable) {
List<Checker> builtInCheckers = Lists.newArrayList();
builtInCheckers.add(storageManager.getStorageChecker());
- healthCheck = new HealthCheck(isHealthy, shuffleServerConf,
builtInCheckers);
+ healthCheck = new HealthCheck(serverStatus, shuffleServerConf,
builtInCheckers);
healthCheck.start();
}
@@ -315,7 +315,11 @@ public class ShuffleServer {
}
public ServerStatus getServerStatus() {
- return serverStatus;
+ return serverStatus.get();
+ }
+
+ public void setServerStatus(ServerStatus serverStatus) {
+ this.serverStatus.set(serverStatus);
}
public synchronized void decommission() {
@@ -323,11 +327,11 @@ public class ShuffleServer {
LOG.info("Shuffle Server is decommissioning. Nothing needs to be done.");
return;
}
- if (!ServerStatus.ACTIVE.equals(serverStatus)) {
+ if (!ServerStatus.ACTIVE.equals(serverStatus.get())) {
throw new InvalidRequestException(
"Shuffle Server is processing other procedures, current status:" +
serverStatus);
}
- serverStatus = ServerStatus.DECOMMISSIONING;
+ serverStatus.set(ServerStatus.DECOMMISSIONING);
LOG.info("Shuffle Server is decommissioning.");
if (executorService == null) {
executorService =
ThreadUtils.getDaemonSingleThreadExecutor("shuffle-server-decommission");
@@ -342,7 +346,7 @@ public class ShuffleServer {
while (isDecommissioning()) {
remainApplicationNum = shuffleTaskManager.getAppIds().size();
if (remainApplicationNum == 0) {
- serverStatus = ServerStatus.DECOMMISSIONED;
+ serverStatus.set(ServerStatus.DECOMMISSIONED);
LOG.info("All applications finished. Current status is " +
serverStatus);
if (shutdownAfterDecommission) {
LOG.info("Exiting...");
@@ -374,11 +378,11 @@ public class ShuffleServer {
LOG.info("Shuffle server is not decommissioning. Nothing needs to be
done.");
return;
}
- if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) {
- serverStatus = ServerStatus.ACTIVE;
+ if (ServerStatus.DECOMMISSIONED.equals(serverStatus.get())) {
+ serverStatus.set(ServerStatus.ACTIVE);
return;
}
- serverStatus = ServerStatus.ACTIVE;
+ serverStatus.set(ServerStatus.ACTIVE);
if (decommissionFuture.cancel(true)) {
LOG.info("Decommission canceled.");
} else {
@@ -453,13 +457,9 @@ public class ShuffleServer {
return Collections.unmodifiableSet(tags);
}
- public boolean isHealthy() {
- return isHealthy.get();
- }
-
@VisibleForTesting
public void markUnhealthy() {
- isHealthy.set(false);
+ serverStatus.set(ServerStatus.UNHEALTHY);
}
public GRPCMetrics getGrpcMetrics() {
@@ -467,8 +467,8 @@ public class ShuffleServer {
}
public boolean isDecommissioning() {
- return ServerStatus.DECOMMISSIONING.equals(serverStatus)
- || ServerStatus.DECOMMISSIONED.equals(serverStatus);
+ return ServerStatus.DECOMMISSIONING.equals(serverStatus.get())
+ || ServerStatus.DECOMMISSIONED.equals(serverStatus.get());
}
@VisibleForTesting
diff --git
a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
index 18b16bb8..1fce9cd3 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java
@@ -17,7 +17,7 @@
package org.apache.uniffle.server;
-class HealthyMockChecker extends Checker {
+public class HealthyMockChecker extends Checker {
@SuppressWarnings("checkstyle:RedundantModifier")
public HealthyMockChecker(ShuffleServerConf conf) {
diff --git
a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
index 1629335e..18e673fb 100644
--- a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
+++ b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java
@@ -17,7 +17,7 @@
package org.apache.uniffle.server;
-class UnHealthyMockChecker extends Checker {
+public class UnHealthyMockChecker extends Checker {
@SuppressWarnings("checkstyle:RedundantModifier")
public UnHealthyMockChecker(ShuffleServerConf conf) {