This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fddace29d [#1675][FOLLOWUP] fix(test): Fix various flaky tests (#1730)
fddace29d is described below
commit fddace29d6a7ed4cee69adcb9a656fb62983dc43
Author: RickyMa <[email protected]>
AuthorDate: Thu May 23 09:49:51 2024 +0800
[#1675][FOLLOWUP] fix(test): Fix various flaky tests (#1730)
### What changes were proposed in this pull request?
Fix various flaky tests:
1. After using `SimpleClusterManager`, release resources at once, which
might cause tests to fail sometimes.
2. The compiler machine may not necessarily have a valid broadcast address
(maybe in a pod created by K8S, and the broadcast address could be 0.0.0.0).
Unit tests should running successfully on such machines. We need to support
this scenario.
3. Refactor the `URI.create` logic in `SimpleClusterManagerTest` to support
cross-platform operation, such as running tests on Windows.
4. Use `IntegrationTestBase#shutdownServers` to reduce duplicated codes.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1675.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unnecessary.
---
.../org/apache/uniffle/common/util/RssUtils.java | 6 +-
.../coordinator/SimpleClusterManagerTest.java | 173 ++++++++++---------
.../PartitionBalanceAssignmentStrategyTest.java | 191 +++++++++++----------
integration-test/common/pom.xml | 1 +
.../uniffle/test/CoordinatorAssignmentTest.java | 23 +--
.../uniffle/test/DiskErrorToleranceTest.java | 19 +-
.../java/org/apache/uniffle/test/QuorumTest.java | 13 +-
.../apache/uniffle/test/RpcClientRetryTest.java | 13 +-
.../java/org/apache/uniffle/test/ServletTest.java | 39 +++--
.../test/ShuffleServerFaultToleranceTest.java | 18 +-
integration-test/mr/pom.xml | 1 +
integration-test/spark-common/pom.xml | 1 +
integration-test/tez/pom.xml | 1 +
pom.xml | 1 +
14 files changed, 236 insertions(+), 264 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 5c65f5eb9..121fe10de 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -123,6 +123,10 @@ public class RssUtils {
}
return ip;
}
+ // Unit tests are executed on a single machine and do not interact with
other machines.
+ // Therefore, unit tests should not require a valid broadcast address.
+ // When running UTs, we will still return the IP address whose broadcast
address is invalid.
+ boolean isTestMode = Boolean.parseBoolean(System.getProperty("test.mode",
"false"));
Enumeration<NetworkInterface> nif =
NetworkInterface.getNetworkInterfaces();
String siteLocalAddress = null;
while (nif.hasMoreElements()) {
@@ -133,7 +137,7 @@ public class RssUtils {
for (InterfaceAddress ifa : ni.getInterfaceAddresses()) {
InetAddress ia = ifa.getAddress();
InetAddress brd = ifa.getBroadcast();
- if (brd == null || brd.isAnyLocalAddress()) {
+ if ((brd == null || brd.isAnyLocalAddress()) && !isTestMode) {
LOGGER.info(
"ip {} was filtered, because it don't have effective broadcast
address",
ia.getHostAddress());
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index affe24d21..0f2411b88 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.net.URI;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -132,40 +133,41 @@ public class SimpleClusterManagerTest {
CoordinatorConf coordinatorConf = new CoordinatorConf();
// Shorten the heartbeat time
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
- SimpleClusterManager clusterManager =
- new SimpleClusterManager(coordinatorConf, new Configuration());
- ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10,
grpcTags);
- ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10,
grpcTags);
- ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11,
grpcTags);
- clusterManager.add(sn1);
- clusterManager.add(sn2);
- clusterManager.add(sn3);
- Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
- await()
- .atMost(1, TimeUnit.SECONDS)
- .until(
- () -> {
- Set<String> lostServerList =
- clusterManager.getLostServerList().stream()
- .map(ServerNode::getId)
- .collect(Collectors.toSet());
- return CollectionUtils.isEqualCollection(lostServerList,
expectedIds);
- });
- // re-register sn3
- sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
- clusterManager.add(sn3);
- Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
- await()
- .atMost(1, TimeUnit.SECONDS)
- .until(
- () -> {
- // Retrieve listed ServerNode List
- Set<String> lostServerListre =
- clusterManager.getLostServerList().stream()
- .map(ServerNode::getId)
- .collect(Collectors.toSet());
- return CollectionUtils.isEqualCollection(lostServerListre,
expectedIdsre);
- });
+ try (SimpleClusterManager clusterManager =
+ new SimpleClusterManager(coordinatorConf, new Configuration())) {
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10,
grpcTags);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10,
grpcTags);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11,
grpcTags);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Set<String> lostServerList =
+ clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId)
+ .collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(lostServerList,
expectedIds);
+ });
+ // re-register sn3
+ sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
+ clusterManager.add(sn3);
+ Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ // Retrieve listed ServerNode List
+ Set<String> lostServerListre =
+ clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId)
+ .collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(lostServerListre,
expectedIdsre);
+ });
+ }
}
@Test
@@ -173,50 +175,51 @@ public class SimpleClusterManagerTest {
CoordinatorConf coordinatorConf = new CoordinatorConf();
// Shorten the heartbeat time
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
- SimpleClusterManager clusterManager =
- new SimpleClusterManager(coordinatorConf, new Configuration());
- ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10,
grpcTags);
- ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10,
grpcTags);
- ServerNode sn3 =
- new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
- ServerNode sn4 =
- new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
- clusterManager.add(sn1);
- clusterManager.add(sn2);
- clusterManager.add(sn3);
- clusterManager.add(sn4);
- // Analog timeout registration
- Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
- await()
- .atMost(1, TimeUnit.SECONDS)
- .until(
- () -> {
- Set<String> unhealthyServerList =
- clusterManager.getUnhealthyServerList().stream()
- .map(ServerNode::getId)
- .collect(Collectors.toSet());
- return CollectionUtils.isEqualCollection(unhealthyServerList,
expectedIds);
- });
- // Register unhealthy node sn3 again
- sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
- clusterManager.add(sn3);
- Set<String> expectedIdsre = Sets.newHashSet("sn3");
- await()
- .atMost(1, TimeUnit.SECONDS)
- .until(
- () -> {
- Set<String> unhealthyServerListre =
- clusterManager.getUnhealthyServerList().stream()
- .map(ServerNode::getId)
- .collect(Collectors.toSet());
- return CollectionUtils.isEqualCollection(unhealthyServerListre,
expectedIdsre);
- });
- // At this point verify that sn4 is in the lost list
- List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
- Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2",
"sn4");
- assertEquals(
- expectedIdlostremoveunhealthy,
-
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+ try (SimpleClusterManager clusterManager =
+ new SimpleClusterManager(coordinatorConf, new Configuration())) {
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10,
grpcTags);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10,
grpcTags);
+ ServerNode sn3 =
+ new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
+ ServerNode sn4 =
+ new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ clusterManager.add(sn4);
+ // Analog timeout registration
+ Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Set<String> unhealthyServerList =
+ clusterManager.getUnhealthyServerList().stream()
+ .map(ServerNode::getId)
+ .collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(unhealthyServerList,
expectedIds);
+ });
+ // Register unhealthy node sn3 again
+ sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags,
ServerStatus.UNHEALTHY);
+ clusterManager.add(sn3);
+ Set<String> expectedIdsre = Sets.newHashSet("sn3");
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Set<String> unhealthyServerListre =
+ clusterManager.getUnhealthyServerList().stream()
+ .map(ServerNode::getId)
+ .collect(Collectors.toSet());
+ return
CollectionUtils.isEqualCollection(unhealthyServerListre, expectedIdsre);
+ });
+ // At this point verify that sn4 is in the lost list
+ List<ServerNode> lostremoveunhealthy =
clusterManager.getLostServerList();
+ Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1",
"sn2", "sn4");
+ assertEquals(
+ expectedIdlostremoveunhealthy,
+
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+ }
}
@Test
@@ -423,11 +426,12 @@ public class SimpleClusterManagerTest {
public void updateExcludeNodesTest() throws Exception {
String excludeNodesFolder =
(new
File(ClassLoader.getSystemResource("empty").getFile())).getParent();
- String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
+ String excludeNodesPath = Paths.get(excludeNodesFolder,
"excludeNodes").toString();
CoordinatorConf ssc = new CoordinatorConf();
+ File excludeNodesFile = new File(excludeNodesPath);
+ URI excludeNodesUri = excludeNodesFile.toURI();
ssc.setString(
- CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
- URI.create(excludeNodesPath).toString());
+ CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
excludeNodesUri.toURL().toString());
ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL,
1000);
try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new
Configuration())) {
@@ -492,11 +496,12 @@ public class SimpleClusterManagerTest {
public void excludeNodesNoDelayTest() throws Exception {
String excludeNodesFolder =
(new
File(ClassLoader.getSystemResource("empty").getFile())).getParent();
- String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
+ String excludeNodesPath = Paths.get(excludeNodesFolder,
"excludeNodes").toString();
CoordinatorConf ssc = new CoordinatorConf();
+ File excludeNodesFile = new File(excludeNodesPath);
+ URI excludeNodesUri = excludeNodesFile.toURI();
ssc.setString(
- CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
- URI.create(excludeNodesPath).toString());
+ CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
excludeNodesUri.toURL().toString());
ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL,
5000);
final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index ad830de79..2d80c65a4 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -308,49 +308,52 @@ public class PartitionBalanceAssignmentStrategyTest {
ssc.set(
CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
AbstractAssignmentStrategy.HostAssignmentStrategyName.MUST_DIFF);
- SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new
Configuration());
- AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- Set<String> serverTags = Sets.newHashSet("tag-1");
+ Set<String> serverTags = Sets.newHashSet("tag-1");
- for (int i = 0; i < 5; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
- for (int i = 0; i < 5; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20
- i, 0, serverTags));
- }
- PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
- pra.getAssignments()
- .values()
- .forEach(
- (nodeList) -> {
- Map<String, ServerNode> nodeMap = new HashMap<>();
- nodeList.forEach(
- (node) -> {
- ServerNode serverNode = nodeMap.get(node.getIp());
- assertNull(serverNode);
- nodeMap.put(node.getIp(), node);
- });
- });
-
- pra = strategy.assign(100, 1, 6, serverTags, -1, -1);
- pra.getAssignments()
- .values()
- .forEach(
- (nodeList) -> {
- Map<String, ServerNode> nodeMap = new HashMap<>();
- boolean hasSameHost = false;
- for (ServerNode node : nodeList) {
- ServerNode serverNode = nodeMap.get(node.getIp());
- if (serverNode != null) {
- hasSameHost = true;
- break;
+ for (int i = 0; i < 5; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ for (int i = 0; i < 5; ++i) {
+ clusterManager.add(
+ new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0,
serverTags));
+ }
+ PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags,
-1, -1);
+ pra.getAssignments()
+ .values()
+ .forEach(
+ (nodeList) -> {
+ Map<String, ServerNode> nodeMap = new HashMap<>();
+ nodeList.forEach(
+ (node) -> {
+ ServerNode serverNode = nodeMap.get(node.getIp());
+ assertNull(serverNode);
+ nodeMap.put(node.getIp(), node);
+ });
+ });
+
+ pra = strategy.assign(100, 1, 6, serverTags, -1, -1);
+ pra.getAssignments()
+ .values()
+ .forEach(
+ (nodeList) -> {
+ Map<String, ServerNode> nodeMap = new HashMap<>();
+ boolean hasSameHost = false;
+ for (ServerNode node : nodeList) {
+ ServerNode serverNode = nodeMap.get(node.getIp());
+ if (serverNode != null) {
+ hasSameHost = true;
+ break;
+ }
+ assertNull(serverNode);
+ nodeMap.put(node.getIp(), node);
}
- assertNull(serverNode);
- nodeMap.put(node.getIp(), node);
- }
- assertTrue(hasSameHost);
- });
+ assertTrue(hasSameHost);
+ });
+ }
}
@Test
@@ -360,46 +363,51 @@ public class PartitionBalanceAssignmentStrategyTest {
ssc.set(
CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
AbstractAssignmentStrategy.HostAssignmentStrategyName.PREFER_DIFF);
- SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new
Configuration());
- AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
Set<String> serverTags = Sets.newHashSet("tag-1");
-
- for (int i = 0; i < 3; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
- for (int i = 0; i < 2; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20
- i, 0, serverTags));
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+ for (int i = 0; i < 3; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ for (int i = 0; i < 2; ++i) {
+ clusterManager.add(
+ new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0,
serverTags));
+ }
+ PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags,
-1, -1);
+ pra.getAssignments()
+ .values()
+ .forEach(
+ (nodeList) -> {
+ assertEquals(5, nodeList.size());
+ });
}
- PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
- pra.getAssignments()
- .values()
- .forEach(
- (nodeList) -> {
- assertEquals(5, nodeList.size());
- });
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 3);
- clusterManager = new SimpleClusterManager(ssc, new Configuration());
- for (int i = 0; i < 3; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
- for (int i = 0; i < 2; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20
- i, 0, serverTags));
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ for (int i = 0; i < 3; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ for (int i = 0; i < 2; ++i) {
+ clusterManager.add(
+ new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0,
serverTags));
+ }
+ AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+ PartitionRangeAssignment pra = strategy.assign(100, 1, 3, serverTags,
-1, -1);
+ pra.getAssignments()
+ .values()
+ .forEach(
+ (nodeList) -> {
+ Map<String, ServerNode> nodeMap = new HashMap<>();
+ nodeList.forEach(
+ (node) -> {
+ ServerNode serverNode = nodeMap.get(node.getIp());
+ assertNull(serverNode);
+ nodeMap.put(node.getIp(), node);
+ });
+ });
}
- strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- pra = strategy.assign(100, 1, 3, serverTags, -1, -1);
- pra.getAssignments()
- .values()
- .forEach(
- (nodeList) -> {
- Map<String, ServerNode> nodeMap = new HashMap<>();
- nodeList.forEach(
- (node) -> {
- ServerNode serverNode = nodeMap.get(node.getIp());
- assertNull(serverNode);
- nodeMap.put(node.getIp(), node);
- });
- });
}
@Test
@@ -409,23 +417,26 @@ public class PartitionBalanceAssignmentStrategyTest {
ssc.set(
CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
AbstractAssignmentStrategy.HostAssignmentStrategyName.NONE);
- SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new
Configuration());
- AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- Set<String> serverTags = Sets.newHashSet("tag-1");
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ AssignmentStrategy strategy = new
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+ Set<String> serverTags = Sets.newHashSet("tag-1");
- for (int i = 0; i < 3; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
- for (int i = 0; i < 2; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20
- i, 0, serverTags));
+ for (int i = 0; i < 3; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ for (int i = 0; i < 2; ++i) {
+ clusterManager.add(
+ new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0,
serverTags));
+ }
+ PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags,
-1, -1);
+ pra.getAssignments()
+ .values()
+ .forEach(
+ (nodeList) -> {
+ assertEquals(5, nodeList.size());
+ });
}
- PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1,
-1);
- pra.getAssignments()
- .values()
- .forEach(
- (nodeList) -> {
- assertEquals(5, nodeList.size());
- });
}
@Test
diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml
index 01b1b91c9..3626208a2 100644
--- a/integration-test/common/pom.xml
+++ b/integration-test/common/pom.xml
@@ -173,6 +173,7 @@
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<project.version>${project.version}</project.version>
+ <test.mode>true</test.mode>
</systemProperties>
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
<useFile>${test.redirectToFile}</useFile>
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index fcad6fa7e..a2fdce988 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -129,19 +129,20 @@ public class CoordinatorAssignmentTest extends
CoordinatorTestBase {
assertEquals(SHUFFLE_NODES_MAX,
info.getServerToPartitionRanges().keySet().size());
// Case2: Enable silent period mechanism, it should fallback to slave
coordinator.
- SimpleClusterManager clusterManager =
- (SimpleClusterManager) coordinators.get(0).getClusterManager();
- clusterManager.setReadyForServe(false);
- clusterManager.setStartupSilentPeriodEnabled(true);
- clusterManager.setStartTime(System.currentTimeMillis() - 1);
+ try (SimpleClusterManager clusterManager =
+ (SimpleClusterManager) coordinators.get(0).getClusterManager()) {
+ clusterManager.setReadyForServe(false);
+ clusterManager.setStartupSilentPeriodEnabled(true);
+ clusterManager.setStartTime(System.currentTimeMillis() - 1);
- if (clusterManager.getNodesNum() < 10) {
- info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS,
-1, -1);
- assertEquals(SHUFFLE_NODES_MAX,
info.getServerToPartitionRanges().keySet().size());
- }
+ if (clusterManager.getNodesNum() < 10) {
+ info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1,
TAGS, -1, -1);
+ assertEquals(SHUFFLE_NODES_MAX,
info.getServerToPartitionRanges().keySet().size());
+ }
- // recover
- clusterManager.setReadyForServe(true);
+ // recover
+ clusterManager.setReadyForServe(true);
+ }
}
@Test
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 4029d1ca6..9cfe686e5 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -54,8 +54,6 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorServer;
-import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -136,22 +134,7 @@ public class DiskErrorToleranceTest extends
ShuffleReadWriteBase {
public void closeClient() throws Exception {
grpcShuffleServerClient.close();
nettyShuffleServerClient.close();
- cleanCluster();
- }
-
- public static void cleanCluster() throws Exception {
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.stopServer();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.stopServer();
- }
- for (ShuffleServer shuffleServer : nettyShuffleServers) {
- shuffleServer.stopServer();
- }
- grpcShuffleServers = Lists.newArrayList();
- nettyShuffleServers = Lists.newArrayList();
- coordinators = Lists.newArrayList();
+ shutdownServers();
}
private static Stream<Arguments> diskErrorTestProvider() {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 6662a3670..d6af2d06b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -190,23 +190,12 @@ public class QuorumTest extends ShuffleReadWriteBase {
Thread.sleep(2000);
}
- public static void cleanCluster() throws Exception {
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.stopServer();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.stopServer();
- }
- grpcShuffleServers = Lists.newArrayList();
- coordinators = Lists.newArrayList();
- }
-
@AfterEach
public void cleanEnv() throws Exception {
if (shuffleWriteClientImpl != null) {
shuffleWriteClientImpl.close();
}
- cleanCluster();
+ shutdownServers();
// we need recovery `rpcTime`, or some unit tests may fail
((ShuffleServerGrpcClient)
ShuffleServerClientFactory.getInstance()
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
index d082ae025..100b645e7 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
@@ -119,23 +119,12 @@ public class RpcClientRetryTest extends
ShuffleReadWriteBase {
}
}
- public static void cleanCluster() throws Exception {
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.stopServer();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.stopServer();
- }
- grpcShuffleServers = Lists.newArrayList();
- coordinators = Lists.newArrayList();
- }
-
@AfterAll
public static void cleanEnv() throws Exception {
if (shuffleWriteClientImpl != null) {
shuffleWriteClientImpl.close();
}
- cleanCluster();
+ shutdownServers();
}
private static Stream<Arguments> testRpcRetryLogicProvider() {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index e5cc08853..3f191ae53 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -170,26 +170,27 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testLostNodesServlet() throws IOException {
- SimpleClusterManager clusterManager =
- (SimpleClusterManager) coordinatorServer.getClusterManager();
- ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
- ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
- Map<String, ServerNode> servers = clusterManager.getServers();
-
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() -
40000);
-
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() -
40000);
- clusterManager.nodesCheckTest();
- List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(),
shuffleServer4.getId());
- List<String> shuffleIds = new ArrayList<>();
- Response<List<HashMap<String, Object>>> response =
- objectMapper.readValue(
- TestUtils.httpGet(LOSTNODES_URL),
- new TypeReference<Response<List<HashMap<String, Object>>>>() {});
- List<HashMap<String, Object>> serverList = response.getData();
- for (HashMap<String, Object> stringObjectHashMap : serverList) {
- String shuffleId = (String) stringObjectHashMap.get("id");
- shuffleIds.add(shuffleId);
+ try (SimpleClusterManager clusterManager =
+ (SimpleClusterManager) coordinatorServer.getClusterManager()) {
+ ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
+ ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+ Map<String, ServerNode> servers = clusterManager.getServers();
+
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() -
40000);
+
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() -
40000);
+ clusterManager.nodesCheckTest();
+ List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(),
shuffleServer4.getId());
+ List<String> shuffleIds = new ArrayList<>();
+ Response<List<HashMap<String, Object>>> response =
+ objectMapper.readValue(
+ TestUtils.httpGet(LOSTNODES_URL),
+ new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+ List<HashMap<String, Object>> serverList = response.getData();
+ for (HashMap<String, Object> stringObjectHashMap : serverList) {
+ String shuffleId = (String) stringObjectHashMap.get("id");
+ shuffleIds.add(shuffleId);
+ }
+ assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds,
shuffleIds));
}
- assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds,
shuffleIds));
}
@Test
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 2eee21227..d08bf80bd 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -50,7 +50,6 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.server.MockedShuffleServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -107,7 +106,7 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
(client) -> {
client.close();
});
- cleanCluster();
+ shutdownServers();
}
private static Stream<Arguments> testReadFaultToleranceProvider() {
@@ -330,19 +329,4 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
retry++;
}
}
-
- public static void cleanCluster() throws Exception {
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.stopServer();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.stopServer();
- }
- for (ShuffleServer shuffleServer : nettyShuffleServers) {
- shuffleServer.stopServer();
- }
- grpcShuffleServers = Lists.newArrayList();
- nettyShuffleServers = Lists.newArrayList();
- coordinators = Lists.newArrayList();
- }
}
diff --git a/integration-test/mr/pom.xml b/integration-test/mr/pom.xml
index 67e119ab5..c1330fe49 100644
--- a/integration-test/mr/pom.xml
+++ b/integration-test/mr/pom.xml
@@ -157,6 +157,7 @@
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<project.version>${project.version}</project.version>
+ <test.mode>true</test.mode>
</systemProperties>
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
<useFile>${test.redirectToFile}</useFile>
diff --git a/integration-test/spark-common/pom.xml
b/integration-test/spark-common/pom.xml
index 6d8a6b30b..a20c3812d 100644
--- a/integration-test/spark-common/pom.xml
+++ b/integration-test/spark-common/pom.xml
@@ -190,6 +190,7 @@
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<project.version>${project.version}</project.version>
+ <test.mode>true</test.mode>
</systemProperties>
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
<useFile>${test.redirectToFile}</useFile>
diff --git a/integration-test/tez/pom.xml b/integration-test/tez/pom.xml
index cabdeab25..e9db188bb 100644
--- a/integration-test/tez/pom.xml
+++ b/integration-test/tez/pom.xml
@@ -162,6 +162,7 @@
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<project.version>${project.version}</project.version>
+ <test.mode>true</test.mode>
</systemProperties>
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
<useFile>${test.redirectToFile}</useFile>
diff --git a/pom.xml b/pom.xml
index 56ccff464..e015e8e5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -921,6 +921,7 @@
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<project.version>${project.version}</project.version>
<jacoco-agent.destfile>target/jacoco.exec</jacoco-agent.destfile>
+ <test.mode>true</test.mode>
</systemProperties>
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
<useFile>${test.redirectToFile}</useFile>