This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ac38db5b8 [#1173] fix: incorrect shuffle server status (#1174)
ac38db5b8 is described below
commit ac38db5b899d010ba5115f38dc4c3a8deb933eca
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Aug 28 17:05:36 2023 +0800
[#1173] fix: incorrect shuffle server status (#1174)
### What changes were proposed in this pull request?
Fix incorrect shuffle server status.
### Why are the changes needed?
Fix: #1173
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT
---
.../uniffle/coordinator/SimpleClusterManager.java | 7 ++----
.../coordinator/web/resource/ServerResource.java | 3 +--
.../java/org/apache/uniffle/test/ServletTest.java | 25 ++++++++++++++++++++++
3 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 4ea6ee0d0..93b94024c 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -139,19 +139,18 @@ public class SimpleClusterManager implements
ClusterManager {
LOG.warn("Heartbeat timeout detect, {} will be removed from node
list.", sn);
sn.setStatus(ServerStatus.LOST);
lostNodes.add(sn);
- unhealthyNodes.remove(sn);
} else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
unhealthyNodes.add(sn);
lostNodes.remove(sn);
} else {
- sn.setStatus(ServerStatus.ACTIVE);
lostNodes.remove(sn);
unhealthyNodes.remove(sn);
}
}
for (ServerNode server : lostNodes) {
ServerNode sn = servers.remove(server.getId());
+ unhealthyNodes.remove(sn);
if (sn != null) {
clientCache.invalidate(sn);
for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
@@ -247,9 +246,7 @@ public class SimpleClusterManager implements ClusterManager
{
if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
continue;
}
- if (!excludeNodes.contains(node.getId())
- && node.getTags().containsAll(requiredTags)
- && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ if (!excludeNodes.contains(node.getId()) &&
node.getTags().containsAll(requiredTags)) {
availableNodes.add(node);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
index e3e21bec3..f4d193fe5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.coordinator.web.resource;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -71,7 +70,7 @@ public class ServerResource extends BaseResource {
} else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) {
serverList = clusterManager.getLostServerList();
} else {
- serverList = clusterManager.getServerList(Collections.emptySet());
+ serverList = clusterManager.list();
}
serverList =
serverList.stream()
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index dc6918aa9..2e84c9df1 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -63,6 +63,8 @@ public class ServletTest extends IntegrationTestBase {
private static final String NODES_URL = URL_PREFIX + "server/nodes";
private static final String LOSTNODES_URL = URL_PREFIX +
"server/nodes?status=LOST";
private static final String UNHEALTHYNODES_URL = URL_PREFIX +
"server/nodes?status=UNHEALTHY";
+ private static final String DECOMMISSIONEDNODES_URL =
+ URL_PREFIX + "server/nodes?status=DECOMMISSIONED";
private static final String DECOMMISSION_URL = URL_PREFIX +
"server/decommission";
private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX +
"server/cancelDecommission";
private static final String DECOMMISSION_SINGLENODE_URL = URL_PREFIX +
"server/%s/decommission";
@@ -81,6 +83,7 @@ public class ServletTest extends IntegrationTestBase {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM,
"127.0.0.1:12346");
+ shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN,
false);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
List<String> basePath =
@@ -169,6 +172,28 @@ public class ServletTest extends IntegrationTestBase {
assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds,
shuffleIds));
}
+ @Test
+ public void testDecommissionedNodeServlet() {
+ ShuffleServer shuffleServer = shuffleServers.get(1);
+ shuffleServer.decommission();
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Response<List<HashMap<String, Object>>> response =
+ objectMapper.readValue(
+ TestUtils.httpGet(DECOMMISSIONEDNODES_URL),
+ new TypeReference<Response<List<HashMap<String,
Object>>>>() {});
+ List<HashMap<String, Object>> serverList = response.getData();
+ for (HashMap<String, Object> stringObjectHashMap : serverList) {
+ String shuffleId = (String) stringObjectHashMap.get("id");
+ return shuffleServer.getId().equals(shuffleId);
+ }
+ return false;
+ });
+ shuffleServer.cancelDecommission();
+ }
+
@Test
public void testUnhealthyNodesServlet() {
ShuffleServer shuffleServer3 = shuffleServers.get(2);