This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new eaa7af457 [#1675][FOLLOWUP] fix(test): Fix various flaky tests (#1730)
eaa7af457 is described below

commit eaa7af4570eafc42d53cba3c70c0a736edb8082f
Author: RickyMa <[email protected]>
AuthorDate: Thu May 23 09:49:51 2024 +0800

    [#1675][FOLLOWUP] fix(test): Fix various flaky tests (#1730)
    
    ### What changes were proposed in this pull request?
    
    Fix various flaky tests:
    1. After using `SimpleClusterManager`, release resources at once, which 
might cause tests to fail sometimes.
    2. The compiler machine may not necessarily have a valid broadcast address 
(maybe in a pod created by K8S, and the broadcast address could be 0.0.0.0). 
Unit tests should running successfully on such machines. We need to support 
this scenario.
    3. Refactor the `URI.create` logic in `SimpleClusterManagerTest` to support 
cross-platform operation, such as running tests on Windows.
    4. Use `IntegrationTestBase#shutdownServers` to reduce duplicated codes.
    
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1675.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unnecessary.
---
 .../org/apache/uniffle/common/util/RssUtils.java   |   6 +-
 .../coordinator/SimpleClusterManagerTest.java      | 173 ++++++++++---------
 .../PartitionBalanceAssignmentStrategyTest.java    | 191 +++++++++++----------
 integration-test/common/pom.xml                    |   1 +
 .../uniffle/test/CoordinatorAssignmentTest.java    |  23 +--
 .../uniffle/test/DiskErrorToleranceTest.java       |  19 +-
 .../java/org/apache/uniffle/test/QuorumTest.java   |  13 +-
 .../apache/uniffle/test/RpcClientRetryTest.java    |  13 +-
 .../java/org/apache/uniffle/test/ServletTest.java  |  39 +++--
 .../test/ShuffleServerFaultToleranceTest.java      |  18 +-
 integration-test/mr/pom.xml                        |   1 +
 integration-test/spark-common/pom.xml              |   1 +
 integration-test/tez/pom.xml                       |   1 +
 pom.xml                                            |   1 +
 14 files changed, 236 insertions(+), 264 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 7026320aa..91ac5197c 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -123,6 +123,10 @@ public class RssUtils {
       }
       return ip;
     }
+    // Unit tests are executed on a single machine and do not interact with 
other machines.
+    // Therefore, unit tests should not require a valid broadcast address.
+    // When running UTs, we will still return the IP address whose broadcast 
address is invalid.
+    boolean isTestMode = Boolean.parseBoolean(System.getProperty("test.mode", 
"false"));
     Enumeration<NetworkInterface> nif = 
NetworkInterface.getNetworkInterfaces();
     String siteLocalAddress = null;
     while (nif.hasMoreElements()) {
@@ -133,7 +137,7 @@ public class RssUtils {
       for (InterfaceAddress ifa : ni.getInterfaceAddresses()) {
         InetAddress ia = ifa.getAddress();
         InetAddress brd = ifa.getBroadcast();
-        if (brd == null || brd.isAnyLocalAddress()) {
+        if ((brd == null || brd.isAnyLocalAddress()) && !isTestMode) {
           LOGGER.info(
               "ip {} was filtered, because it don't have effective broadcast 
address",
               ia.getHostAddress());
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index 4464d7497..43ab1b33e 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
 import java.net.URI;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -132,40 +133,41 @@ public class SimpleClusterManagerTest {
     CoordinatorConf coordinatorConf = new CoordinatorConf();
     // Shorten the heartbeat time
     coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
-    SimpleClusterManager clusterManager =
-        new SimpleClusterManager(coordinatorConf, new Configuration());
-    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, 
grpcTags);
-    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, 
grpcTags);
-    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, 
grpcTags);
-    clusterManager.add(sn1);
-    clusterManager.add(sn2);
-    clusterManager.add(sn3);
-    Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
-    await()
-        .atMost(1, TimeUnit.SECONDS)
-        .until(
-            () -> {
-              Set<String> lostServerList =
-                  clusterManager.getLostServerList().stream()
-                      .map(ServerNode::getId)
-                      .collect(Collectors.toSet());
-              return CollectionUtils.isEqualCollection(lostServerList, 
expectedIds);
-            });
-    // re-register sn3
-    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
-    clusterManager.add(sn3);
-    Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
-    await()
-        .atMost(1, TimeUnit.SECONDS)
-        .until(
-            () -> {
-              // Retrieve listed ServerNode List
-              Set<String> lostServerListre =
-                  clusterManager.getLostServerList().stream()
-                      .map(ServerNode::getId)
-                      .collect(Collectors.toSet());
-              return CollectionUtils.isEqualCollection(lostServerListre, 
expectedIdsre);
-            });
+    try (SimpleClusterManager clusterManager =
+        new SimpleClusterManager(coordinatorConf, new Configuration())) {
+      ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, 
grpcTags);
+      ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, 
grpcTags);
+      ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, 
grpcTags);
+      clusterManager.add(sn1);
+      clusterManager.add(sn2);
+      clusterManager.add(sn3);
+      Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+      await()
+          .atMost(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                Set<String> lostServerList =
+                    clusterManager.getLostServerList().stream()
+                        .map(ServerNode::getId)
+                        .collect(Collectors.toSet());
+                return CollectionUtils.isEqualCollection(lostServerList, 
expectedIds);
+              });
+      // re-register sn3
+      sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags);
+      clusterManager.add(sn3);
+      Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+      await()
+          .atMost(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                // Retrieve listed ServerNode List
+                Set<String> lostServerListre =
+                    clusterManager.getLostServerList().stream()
+                        .map(ServerNode::getId)
+                        .collect(Collectors.toSet());
+                return CollectionUtils.isEqualCollection(lostServerListre, 
expectedIdsre);
+              });
+    }
   }
 
   @Test
@@ -173,50 +175,51 @@ public class SimpleClusterManagerTest {
     CoordinatorConf coordinatorConf = new CoordinatorConf();
     // Shorten the heartbeat time
     coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
-    SimpleClusterManager clusterManager =
-        new SimpleClusterManager(coordinatorConf, new Configuration());
-    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, 
grpcTags);
-    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, 
grpcTags);
-    ServerNode sn3 =
-        new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
-    ServerNode sn4 =
-        new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
-    clusterManager.add(sn1);
-    clusterManager.add(sn2);
-    clusterManager.add(sn3);
-    clusterManager.add(sn4);
-    // Analog timeout registration
-    Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
-    await()
-        .atMost(1, TimeUnit.SECONDS)
-        .until(
-            () -> {
-              Set<String> unhealthyServerList =
-                  clusterManager.getUnhealthyServerList().stream()
-                      .map(ServerNode::getId)
-                      .collect(Collectors.toSet());
-              return CollectionUtils.isEqualCollection(unhealthyServerList, 
expectedIds);
-            });
-    // Register unhealthy node sn3 again
-    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
-    clusterManager.add(sn3);
-    Set<String> expectedIdsre = Sets.newHashSet("sn3");
-    await()
-        .atMost(1, TimeUnit.SECONDS)
-        .until(
-            () -> {
-              Set<String> unhealthyServerListre =
-                  clusterManager.getUnhealthyServerList().stream()
-                      .map(ServerNode::getId)
-                      .collect(Collectors.toSet());
-              return CollectionUtils.isEqualCollection(unhealthyServerListre, 
expectedIdsre);
-            });
-    // At this point verify that sn4 is in the lost list
-    List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
-    Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2", 
"sn4");
-    assertEquals(
-        expectedIdlostremoveunhealthy,
-        
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    try (SimpleClusterManager clusterManager =
+        new SimpleClusterManager(coordinatorConf, new Configuration())) {
+      ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, 
grpcTags);
+      ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, 
grpcTags);
+      ServerNode sn3 =
+          new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
+      ServerNode sn4 =
+          new ServerNode("sn4", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
+      clusterManager.add(sn1);
+      clusterManager.add(sn2);
+      clusterManager.add(sn3);
+      clusterManager.add(sn4);
+      // Analog timeout registration
+      Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
+      await()
+          .atMost(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                Set<String> unhealthyServerList =
+                    clusterManager.getUnhealthyServerList().stream()
+                        .map(ServerNode::getId)
+                        .collect(Collectors.toSet());
+                return CollectionUtils.isEqualCollection(unhealthyServerList, 
expectedIds);
+              });
+      // Register unhealthy node sn3 again
+      sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, 11, grpcTags, 
ServerStatus.UNHEALTHY);
+      clusterManager.add(sn3);
+      Set<String> expectedIdsre = Sets.newHashSet("sn3");
+      await()
+          .atMost(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                Set<String> unhealthyServerListre =
+                    clusterManager.getUnhealthyServerList().stream()
+                        .map(ServerNode::getId)
+                        .collect(Collectors.toSet());
+                return 
CollectionUtils.isEqualCollection(unhealthyServerListre, expectedIdsre);
+              });
+      // At this point verify that sn4 is in the lost list
+      List<ServerNode> lostremoveunhealthy = 
clusterManager.getLostServerList();
+      Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", 
"sn2", "sn4");
+      assertEquals(
+          expectedIdlostremoveunhealthy,
+          
lostremoveunhealthy.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    }
   }
 
   @Test
@@ -423,11 +426,12 @@ public class SimpleClusterManagerTest {
   public void updateExcludeNodesTest() throws Exception {
     String excludeNodesFolder =
         (new 
File(ClassLoader.getSystemResource("empty").getFile())).getParent();
-    String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
+    String excludeNodesPath = Paths.get(excludeNodesFolder, 
"excludeNodes").toString();
     CoordinatorConf ssc = new CoordinatorConf();
+    File excludeNodesFile = new File(excludeNodesPath);
+    URI excludeNodesUri = excludeNodesFile.toURI();
     ssc.setString(
-        CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
-        URI.create(excludeNodesPath).toString());
+        CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, 
excludeNodesUri.toURL().toString());
     ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 
1000);
 
     try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new 
Configuration())) {
@@ -492,11 +496,12 @@ public class SimpleClusterManagerTest {
   public void excludeNodesNoDelayTest() throws Exception {
     String excludeNodesFolder =
         (new 
File(ClassLoader.getSystemResource("empty").getFile())).getParent();
-    String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
+    String excludeNodesPath = Paths.get(excludeNodesFolder, 
"excludeNodes").toString();
     CoordinatorConf ssc = new CoordinatorConf();
+    File excludeNodesFile = new File(excludeNodesPath);
+    URI excludeNodesUri = excludeNodesFile.toURI();
     ssc.setString(
-        CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH,
-        URI.create(excludeNodesPath).toString());
+        CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, 
excludeNodesUri.toURL().toString());
     ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 
5000);
 
     final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index ad830de79..2d80c65a4 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -308,49 +308,52 @@ public class PartitionBalanceAssignmentStrategyTest {
     ssc.set(
         CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
         AbstractAssignmentStrategy.HostAssignmentStrategyName.MUST_DIFF);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
-    AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
 
-    Set<String> serverTags = Sets.newHashSet("tag-1");
+      Set<String> serverTags = Sets.newHashSet("tag-1");
 
-    for (int i = 0; i < 5; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
-    for (int i = 0; i < 5; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 
- i, 0, serverTags));
-    }
-    PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
-    pra.getAssignments()
-        .values()
-        .forEach(
-            (nodeList) -> {
-              Map<String, ServerNode> nodeMap = new HashMap<>();
-              nodeList.forEach(
-                  (node) -> {
-                    ServerNode serverNode = nodeMap.get(node.getIp());
-                    assertNull(serverNode);
-                    nodeMap.put(node.getIp(), node);
-                  });
-            });
-
-    pra = strategy.assign(100, 1, 6, serverTags, -1, -1);
-    pra.getAssignments()
-        .values()
-        .forEach(
-            (nodeList) -> {
-              Map<String, ServerNode> nodeMap = new HashMap<>();
-              boolean hasSameHost = false;
-              for (ServerNode node : nodeList) {
-                ServerNode serverNode = nodeMap.get(node.getIp());
-                if (serverNode != null) {
-                  hasSameHost = true;
-                  break;
+      for (int i = 0; i < 5; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      for (int i = 0; i < 5; ++i) {
+        clusterManager.add(
+            new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, 
-1, -1);
+      pra.getAssignments()
+          .values()
+          .forEach(
+              (nodeList) -> {
+                Map<String, ServerNode> nodeMap = new HashMap<>();
+                nodeList.forEach(
+                    (node) -> {
+                      ServerNode serverNode = nodeMap.get(node.getIp());
+                      assertNull(serverNode);
+                      nodeMap.put(node.getIp(), node);
+                    });
+              });
+
+      pra = strategy.assign(100, 1, 6, serverTags, -1, -1);
+      pra.getAssignments()
+          .values()
+          .forEach(
+              (nodeList) -> {
+                Map<String, ServerNode> nodeMap = new HashMap<>();
+                boolean hasSameHost = false;
+                for (ServerNode node : nodeList) {
+                  ServerNode serverNode = nodeMap.get(node.getIp());
+                  if (serverNode != null) {
+                    hasSameHost = true;
+                    break;
+                  }
+                  assertNull(serverNode);
+                  nodeMap.put(node.getIp(), node);
                 }
-                assertNull(serverNode);
-                nodeMap.put(node.getIp(), node);
-              }
-              assertTrue(hasSameHost);
-            });
+                assertTrue(hasSameHost);
+              });
+    }
   }
 
   @Test
@@ -360,46 +363,51 @@ public class PartitionBalanceAssignmentStrategyTest {
     ssc.set(
         CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
         AbstractAssignmentStrategy.HostAssignmentStrategyName.PREFER_DIFF);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
-    AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
     Set<String> serverTags = Sets.newHashSet("tag-1");
-
-    for (int i = 0; i < 3; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
-    for (int i = 0; i < 2; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 
- i, 0, serverTags));
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+      for (int i = 0; i < 3; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      for (int i = 0; i < 2; ++i) {
+        clusterManager.add(
+            new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, 
-1, -1);
+      pra.getAssignments()
+          .values()
+          .forEach(
+              (nodeList) -> {
+                assertEquals(5, nodeList.size());
+              });
     }
-    PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
-    pra.getAssignments()
-        .values()
-        .forEach(
-            (nodeList) -> {
-              assertEquals(5, nodeList.size());
-            });
 
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 3);
-    clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    for (int i = 0; i < 3; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
-    for (int i = 0; i < 2; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 
- i, 0, serverTags));
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      for (int i = 0; i < 3; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      for (int i = 0; i < 2; ++i) {
+        clusterManager.add(
+            new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+      PartitionRangeAssignment pra = strategy.assign(100, 1, 3, serverTags, 
-1, -1);
+      pra.getAssignments()
+          .values()
+          .forEach(
+              (nodeList) -> {
+                Map<String, ServerNode> nodeMap = new HashMap<>();
+                nodeList.forEach(
+                    (node) -> {
+                      ServerNode serverNode = nodeMap.get(node.getIp());
+                      assertNull(serverNode);
+                      nodeMap.put(node.getIp(), node);
+                    });
+              });
     }
-    strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
-    pra = strategy.assign(100, 1, 3, serverTags, -1, -1);
-    pra.getAssignments()
-        .values()
-        .forEach(
-            (nodeList) -> {
-              Map<String, ServerNode> nodeMap = new HashMap<>();
-              nodeList.forEach(
-                  (node) -> {
-                    ServerNode serverNode = nodeMap.get(node.getIp());
-                    assertNull(serverNode);
-                    nodeMap.put(node.getIp(), node);
-                  });
-            });
   }
 
   @Test
@@ -409,23 +417,26 @@ public class PartitionBalanceAssignmentStrategyTest {
     ssc.set(
         CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY,
         AbstractAssignmentStrategy.HostAssignmentStrategyName.NONE);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
-    AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
-    Set<String> serverTags = Sets.newHashSet("tag-1");
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      AssignmentStrategy strategy = new 
PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+      Set<String> serverTags = Sets.newHashSet("tag-1");
 
-    for (int i = 0; i < 3; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
-    for (int i = 0; i < 2; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 
- i, 0, serverTags));
+      for (int i = 0; i < 3; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      for (int i = 0; i < 2; ++i) {
+        clusterManager.add(
+            new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, 
-1, -1);
+      pra.getAssignments()
+          .values()
+          .forEach(
+              (nodeList) -> {
+                assertEquals(5, nodeList.size());
+              });
     }
-    PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, 
-1);
-    pra.getAssignments()
-        .values()
-        .forEach(
-            (nodeList) -> {
-              assertEquals(5, nodeList.size());
-            });
   }
 
   @Test
diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml
index aa4f5cd60..33b5c2844 100644
--- a/integration-test/common/pom.xml
+++ b/integration-test/common/pom.xml
@@ -173,6 +173,7 @@
                             <java.awt.headless>true</java.awt.headless>
                             
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
                             
<project.version>${project.version}</project.version>
+                            <test.mode>true</test.mode>
                         </systemProperties>
                         
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
                         <useFile>${test.redirectToFile}</useFile>
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index fcad6fa7e..a2fdce988 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -129,19 +129,20 @@ public class CoordinatorAssignmentTest extends 
CoordinatorTestBase {
     assertEquals(SHUFFLE_NODES_MAX, 
info.getServerToPartitionRanges().keySet().size());
 
     // Case2: Enable silent period mechanism, it should fallback to slave 
coordinator.
-    SimpleClusterManager clusterManager =
-        (SimpleClusterManager) coordinators.get(0).getClusterManager();
-    clusterManager.setReadyForServe(false);
-    clusterManager.setStartupSilentPeriodEnabled(true);
-    clusterManager.setStartTime(System.currentTimeMillis() - 1);
+    try (SimpleClusterManager clusterManager =
+        (SimpleClusterManager) coordinators.get(0).getClusterManager()) {
+      clusterManager.setReadyForServe(false);
+      clusterManager.setStartupSilentPeriodEnabled(true);
+      clusterManager.setStartTime(System.currentTimeMillis() - 1);
 
-    if (clusterManager.getNodesNum() < 10) {
-      info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, 
-1, -1);
-      assertEquals(SHUFFLE_NODES_MAX, 
info.getServerToPartitionRanges().keySet().size());
-    }
+      if (clusterManager.getNodesNum() < 10) {
+        info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, 
TAGS, -1, -1);
+        assertEquals(SHUFFLE_NODES_MAX, 
info.getServerToPartitionRanges().keySet().size());
+      }
 
-    // recover
-    clusterManager.setReadyForServe(true);
+      // recover
+      clusterManager.setReadyForServe(true);
+    }
   }
 
   @Test
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 4029d1ca6..9cfe686e5 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -54,8 +54,6 @@ import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorServer;
-import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -136,22 +134,7 @@ public class DiskErrorToleranceTest extends 
ShuffleReadWriteBase {
   public void closeClient() throws Exception {
     grpcShuffleServerClient.close();
     nettyShuffleServerClient.close();
-    cleanCluster();
-  }
-
-  public static void cleanCluster() throws Exception {
-    for (CoordinatorServer coordinator : coordinators) {
-      coordinator.stopServer();
-    }
-    for (ShuffleServer shuffleServer : grpcShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    for (ShuffleServer shuffleServer : nettyShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    grpcShuffleServers = Lists.newArrayList();
-    nettyShuffleServers = Lists.newArrayList();
-    coordinators = Lists.newArrayList();
+    shutdownServers();
   }
 
   private static Stream<Arguments> diskErrorTestProvider() {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 6662a3670..d6af2d06b 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -190,23 +190,12 @@ public class QuorumTest extends ShuffleReadWriteBase {
     Thread.sleep(2000);
   }
 
-  public static void cleanCluster() throws Exception {
-    for (CoordinatorServer coordinator : coordinators) {
-      coordinator.stopServer();
-    }
-    for (ShuffleServer shuffleServer : grpcShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    grpcShuffleServers = Lists.newArrayList();
-    coordinators = Lists.newArrayList();
-  }
-
   @AfterEach
   public void cleanEnv() throws Exception {
     if (shuffleWriteClientImpl != null) {
       shuffleWriteClientImpl.close();
     }
-    cleanCluster();
+    shutdownServers();
     // we need recovery `rpcTime`, or some unit tests may fail
     ((ShuffleServerGrpcClient)
             ShuffleServerClientFactory.getInstance()
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
index d082ae025..100b645e7 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RpcClientRetryTest.java
@@ -119,23 +119,12 @@ public class RpcClientRetryTest extends 
ShuffleReadWriteBase {
     }
   }
 
-  public static void cleanCluster() throws Exception {
-    for (CoordinatorServer coordinator : coordinators) {
-      coordinator.stopServer();
-    }
-    for (ShuffleServer shuffleServer : grpcShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    grpcShuffleServers = Lists.newArrayList();
-    coordinators = Lists.newArrayList();
-  }
-
   @AfterAll
   public static void cleanEnv() throws Exception {
     if (shuffleWriteClientImpl != null) {
       shuffleWriteClientImpl.close();
     }
-    cleanCluster();
+    shutdownServers();
   }
 
   private static Stream<Arguments> testRpcRetryLogicProvider() {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index 6e4b7ca05..980199e1a 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -170,26 +170,27 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testLostNodesServlet() throws IOException {
-    SimpleClusterManager clusterManager =
-        (SimpleClusterManager) coordinatorServer.getClusterManager();
-    ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
-    ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
-    Map<String, ServerNode> servers = clusterManager.getServers();
-    
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
-    
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
-    clusterManager.nodesCheckTest();
-    List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(), 
shuffleServer4.getId());
-    List<String> shuffleIds = new ArrayList<>();
-    Response<List<HashMap<String, Object>>> response =
-        objectMapper.readValue(
-            TestUtils.httpGet(LOSTNODES_URL),
-            new TypeReference<Response<List<HashMap<String, Object>>>>() {});
-    List<HashMap<String, Object>> serverList = response.getData();
-    for (HashMap<String, Object> stringObjectHashMap : serverList) {
-      String shuffleId = (String) stringObjectHashMap.get("id");
-      shuffleIds.add(shuffleId);
+    try (SimpleClusterManager clusterManager =
+        (SimpleClusterManager) coordinatorServer.getClusterManager()) {
+      ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
+      ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+      Map<String, ServerNode> servers = clusterManager.getServers();
+      
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
+      
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
+      clusterManager.nodesCheckTest();
+      List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(), 
shuffleServer4.getId());
+      List<String> shuffleIds = new ArrayList<>();
+      Response<List<HashMap<String, Object>>> response =
+          objectMapper.readValue(
+              TestUtils.httpGet(LOSTNODES_URL),
+              new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+      List<HashMap<String, Object>> serverList = response.getData();
+      for (HashMap<String, Object> stringObjectHashMap : serverList) {
+        String shuffleId = (String) stringObjectHashMap.get("id");
+        shuffleIds.add(shuffleId);
+      }
+      assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds, 
shuffleIds));
     }
-    assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds, 
shuffleIds));
   }
 
   @Test
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 2eee21227..d08bf80bd 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -50,7 +50,6 @@ import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.server.MockedShuffleServer;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -107,7 +106,7 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
         (client) -> {
           client.close();
         });
-    cleanCluster();
+    shutdownServers();
   }
 
   private static Stream<Arguments> testReadFaultToleranceProvider() {
@@ -330,19 +329,4 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
       retry++;
     }
   }
-
-  public static void cleanCluster() throws Exception {
-    for (CoordinatorServer coordinator : coordinators) {
-      coordinator.stopServer();
-    }
-    for (ShuffleServer shuffleServer : grpcShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    for (ShuffleServer shuffleServer : nettyShuffleServers) {
-      shuffleServer.stopServer();
-    }
-    grpcShuffleServers = Lists.newArrayList();
-    nettyShuffleServers = Lists.newArrayList();
-    coordinators = Lists.newArrayList();
-  }
 }
diff --git a/integration-test/mr/pom.xml b/integration-test/mr/pom.xml
index 0b1b378e6..c050332e6 100644
--- a/integration-test/mr/pom.xml
+++ b/integration-test/mr/pom.xml
@@ -157,6 +157,7 @@
                             <java.awt.headless>true</java.awt.headless>
                             
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
                             
<project.version>${project.version}</project.version>
+                            <test.mode>true</test.mode>
                         </systemProperties>
                         
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
                         <useFile>${test.redirectToFile}</useFile>
diff --git a/integration-test/spark-common/pom.xml 
b/integration-test/spark-common/pom.xml
index eea33dff1..b5b8d02ad 100644
--- a/integration-test/spark-common/pom.xml
+++ b/integration-test/spark-common/pom.xml
@@ -190,6 +190,7 @@
               <java.awt.headless>true</java.awt.headless>
               <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
               <project.version>${project.version}</project.version>
+              <test.mode>true</test.mode>
             </systemProperties>
             
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
             <useFile>${test.redirectToFile}</useFile>
diff --git a/integration-test/tez/pom.xml b/integration-test/tez/pom.xml
index a63b741b3..16a7ad4b5 100644
--- a/integration-test/tez/pom.xml
+++ b/integration-test/tez/pom.xml
@@ -162,6 +162,7 @@
               <java.awt.headless>true</java.awt.headless>
               <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
               <project.version>${project.version}</project.version>
+              <test.mode>true</test.mode>
             </systemProperties>
             
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
             <useFile>${test.redirectToFile}</useFile>
diff --git a/pom.xml b/pom.xml
index 69dc4d85d..07c877385 100644
--- a/pom.xml
+++ b/pom.xml
@@ -916,6 +916,7 @@
               <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
               <project.version>${project.version}</project.version>
               <jacoco-agent.destfile>target/jacoco.exec</jacoco-agent.destfile>
+              <test.mode>true</test.mode>
             </systemProperties>
             
<redirectTestOutputToFile>${test.redirectToFile}</redirectTestOutputToFile>
             <useFile>${test.redirectToFile}</useFile>


Reply via email to