This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 417674d77 [#1675][FOLLOWUP] fix(test): Explicitly close resources to 
avoid unexcepted behaviors (#1740)
417674d77 is described below

commit 417674d779cc2654f7f0adca2c0a53a0da7f5b4d
Author: RickyMa <[email protected]>
AuthorDate: Tue May 28 13:57:51 2024 +0800

    [#1675][FOLLOWUP] fix(test): Explicitly close resources to avoid unexcepted 
behaviors (#1740)
    
    ### What changes were proposed in this pull request?
    
    Fix various flaky tests:
    1. After using `ApplicationManager`, release resources at once, which will 
let `ScheduledExecutorService` work background to cause unexpected behaviors.
    2. Release `DynamicClientConfService` after using it.
    3. Release `SimpleClusterManager` everytime after using it. Since the 
execution order of JUnit test methods is random, unless we specifically set the 
execution order. So it is better not to initialize the `SimpleClusterManager` 
object globally.
    4. Wait for a while to ensure the port is released in `UniffleJavaProcess`.
    5. Always call `stopServer` after `new CoordinatorServer()` to shut down 
multiple `ExecutorService` from running in background, which could cause 
unexpected behaviors.
    
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1675.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unnecessary.
---
 .../coordinator/ApplicationManagerTest.java        |   6 +
 .../uniffle/coordinator/CoordinatorServerTest.java |   7 +
 .../uniffle/coordinator/QuotaManagerTest.java      | 217 +++++-----
 .../coordinator/access/AccessManagerTest.java      |  91 ++--
 .../checker/AccessCandidatesCheckerTest.java       | 155 +++----
 .../checker/AccessClusterLoadCheckerTest.java      | 118 +++---
 .../checker/AccessQuotaCheckerTest.java            |  66 +--
 .../conf/DynamicClientConfServiceTest.java         |   7 +-
 .../assignment/BasicAssignmentStrategyTest.java    | 386 +++++++++--------
 .../PartitionBalanceAssignmentStrategyTest.java    | 470 +++++++++++----------
 .../AppBalanceSelectStorageStrategyTest.java       |   6 +
 ...owestIOSampleCostSelectStorageStrategyTest.java |   6 +
 .../coordinator/web/UniffleJavaProcess.java        |   5 +
 .../test/AccessCandidatesCheckerHadoopTest.java    | 161 +++----
 14 files changed, 886 insertions(+), 815 deletions(-)

diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 0a576eb3a..a307119e4 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -60,6 +61,11 @@ public class ApplicationManagerTest {
     applicationManager = new ApplicationManager(conf);
   }
 
+  @AfterEach
+  public void tearDown() {
+    applicationManager.close();
+  }
+
   @Test
   public void refreshTest() {
     String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + 
remotePath2;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
index 59e080a48..ef151207e 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
@@ -45,6 +45,9 @@ public class CoordinatorServerTest {
     } catch (Exception e) {
       assertEquals(expectMessage, e.getMessage());
       assertEquals(expectStatus, ((ExitException) e).getStatus());
+    } finally {
+      // Always call stopServer after new CoordinatorServer to shut down 
ExecutorService
+      cs2.stopServer();
     }
 
     coordinatorConf.setInteger("rss.jetty.http.port", 9529);
@@ -55,6 +58,10 @@ public class CoordinatorServerTest {
     } catch (Exception e) {
       assertEquals(expectMessage, e.getMessage());
       assertEquals(expectStatus, ((ExitException) e).getStatus());
+    } finally {
+      // Always call stopServer after new CoordinatorServer to shut down 
ExecutorService
+      cs2.stopServer();
+      cs1.stopServer();
     }
 
     final Thread t =
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index b7ce09177..41be903c2 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -61,17 +61,18 @@ public class QuotaManagerTest {
   public void testDetectUserResource() {
     CoordinatorConf conf = new CoordinatorConf();
     conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    Awaitility.await()
-        .timeout(5, TimeUnit.SECONDS)
-        .until(() -> applicationManager.getDefaultUserApps().size() > 2);
-
-    Integer user1 = applicationManager.getDefaultUserApps().get("user1");
-    Integer user2 = applicationManager.getDefaultUserApps().get("user2");
-    Integer user3 = applicationManager.getDefaultUserApps().get("user3");
-    assertEquals(user1, 10);
-    assertEquals(user2, 20);
-    assertEquals(user3, 30);
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      Awaitility.await()
+          .timeout(5, TimeUnit.SECONDS)
+          .until(() -> applicationManager.getDefaultUserApps().size() > 2);
+
+      Integer user1 = applicationManager.getDefaultUserApps().get("user1");
+      Integer user2 = applicationManager.getDefaultUserApps().get("user2");
+      Integer user3 = applicationManager.getDefaultUserApps().get("user3");
+      assertEquals(user1, 10);
+      assertEquals(user2, 20);
+      assertEquals(user3, 30);
+    }
   }
 
   @Test
@@ -82,10 +83,11 @@ public class QuotaManagerTest {
         CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
         Lists.newArrayList(
             
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    // it didn't detectUserResource because 
`org.apache.unifle.coordinator.AccessQuotaChecker` is
-    // not configured
-    assertNull(applicationManager.getQuotaManager());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      // it didn't detectUserResource because 
`org.apache.unifle.coordinator.AccessQuotaChecker` is
+      // not configured
+      assertNull(applicationManager.getQuotaManager());
+    }
   }
 
   @Test
@@ -93,34 +95,35 @@ public class QuotaManagerTest {
     CoordinatorConf conf = new CoordinatorConf();
     conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
     conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 5);
-    final ApplicationManager applicationManager = new ApplicationManager(conf);
-    final AtomicInteger uuid = new AtomicInteger();
-    Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
-    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
-    final int i1 = uuid.incrementAndGet();
-    uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
-    Map<String, Long> appAndTime =
-        applicationManager
-            .getQuotaManager()
-            .getCurrentUserAndApp()
-            .computeIfAbsent("user4", x -> uuidAndTime);
-    // This thread may remove the uuid and put the appId in.
-    final Thread registerThread =
-        new Thread(
-            () ->
-                applicationManager
-                    .getQuotaManager()
-                    .registerApplicationInfo("application_test_" + i1, 
appAndTime));
-    registerThread.start();
-    final boolean icCheck =
-        applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i1));
-    registerThread.join();
-    assertTrue(icCheck);
-    assertEquals(
-        
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
 5);
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      final AtomicInteger uuid = new AtomicInteger();
+      Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
+      uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(uuid.incrementAndGet()), 
System.currentTimeMillis());
+      final int i1 = uuid.incrementAndGet();
+      uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+      Map<String, Long> appAndTime =
+          applicationManager
+              .getQuotaManager()
+              .getCurrentUserAndApp()
+              .computeIfAbsent("user4", x -> uuidAndTime);
+      // This thread may remove the uuid and put the appId in.
+      final Thread registerThread =
+          new Thread(
+              () ->
+                  applicationManager
+                      .getQuotaManager()
+                      .registerApplicationInfo("application_test_" + i1, 
appAndTime));
+      registerThread.start();
+      final boolean icCheck =
+          applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i1));
+      registerThread.join();
+      assertTrue(icCheck);
+      assertEquals(
+          
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
 5);
+    }
   }
 
   @Test
@@ -131,75 +134,77 @@ public class QuotaManagerTest {
     conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
     conf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 1500);
     conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 2);
-    final ApplicationManager applicationManager = new ApplicationManager(conf);
-    final AtomicInteger uuid = new AtomicInteger();
-    final int i1 = uuid.incrementAndGet();
-    final int i2 = uuid.incrementAndGet();
-    final int i3 = uuid.incrementAndGet();
-    final int i4 = uuid.incrementAndGet();
-    Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
-    uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
-    uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
-    final boolean icCheck =
-        applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i1));
-    final boolean icCheck2 =
-        applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i2));
-    final boolean icCheck3 =
-        applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i3));
-    final boolean icCheck4 =
-        applicationManager.getQuotaManager().checkQuota("user3", 
String.valueOf(i4));
-    assertFalse(icCheck);
-    assertFalse(icCheck2);
-    // The default number of tasks submitted is 2, and the third will be 
rejected
-    assertTrue(icCheck3);
-    assertFalse(icCheck4);
-    assertEquals(
-        
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
 2);
-    
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(), 
2);
-    
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(), 
1);
-    await()
-        .atMost(2, TimeUnit.SECONDS)
-        .until(
-            () -> {
-              applicationManager.statusCheck();
-              // If the number of apps corresponding to this user is 0, remove 
this user
-              return 
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
-                  && 
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
-            });
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      final AtomicInteger uuid = new AtomicInteger();
+      final int i1 = uuid.incrementAndGet();
+      final int i2 = uuid.incrementAndGet();
+      final int i3 = uuid.incrementAndGet();
+      final int i4 = uuid.incrementAndGet();
+      Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
+      uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
+      uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
+      final boolean icCheck =
+          applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i1));
+      final boolean icCheck2 =
+          applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i2));
+      final boolean icCheck3 =
+          applicationManager.getQuotaManager().checkQuota("user4", 
String.valueOf(i3));
+      final boolean icCheck4 =
+          applicationManager.getQuotaManager().checkQuota("user3", 
String.valueOf(i4));
+      assertFalse(icCheck);
+      assertFalse(icCheck2);
+      // The default number of tasks submitted is 2, and the third will be 
rejected
+      assertTrue(icCheck3);
+      assertFalse(icCheck4);
+      assertEquals(
+          
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
 2);
+      
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(), 
2);
+      
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(), 
1);
+      await()
+          .atMost(2, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                applicationManager.statusCheck();
+                // If the number of apps corresponding to this user is 0, 
remove this user
+                return 
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
+                    && 
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
+              });
+    }
   }
 
   @Test
   public void testCheckQuotaWithDefault() {
     CoordinatorConf conf = new CoordinatorConf();
     conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
-    final ApplicationManager applicationManager = new ApplicationManager(conf);
-    Awaitility.await()
-        .timeout(5, TimeUnit.SECONDS)
-        .until(() -> applicationManager.getDefaultUserApps().size() > 2);
-
-    QuotaManager quotaManager = applicationManager.getQuotaManager();
-    Map<String, Map<String, Long>> currentUserAndApp = 
quotaManager.getCurrentUserAndApp();
-
-    currentUserAndApp.computeIfAbsent("user1", x -> mockUUidAppAndTime(30));
-    currentUserAndApp.computeIfAbsent("user2", x -> mockUUidAppAndTime(20));
-    currentUserAndApp.computeIfAbsent("user3", x -> mockUUidAppAndTime(29));
-    currentUserAndApp.computeIfAbsent("disable_quota_user1", x -> 
mockUUidAppAndTime(100));
-    currentUserAndApp.computeIfAbsent("blank_user1", x -> 
mockUUidAppAndTime(0));
-
-    assertEquals(currentUserAndApp.get("user1").size(), 30);
-    assertEquals(currentUserAndApp.get("user2").size(), 20);
-    assertEquals(currentUserAndApp.get("user3").size(), 29);
-    assertEquals(currentUserAndApp.get("disable_quota_user1").size(), 100);
-    assertEquals(currentUserAndApp.get("blank_user1").size(), 0);
-
-    assertTrue(quotaManager.checkQuota("user1", mockUUidAppId()));
-    assertTrue(quotaManager.checkQuota("user2", mockUUidAppId()));
-    assertFalse(quotaManager.checkQuota("user3", mockUUidAppId()));
-    assertTrue(quotaManager.checkQuota("user3", mockUUidAppId()));
-    assertFalse(quotaManager.checkQuota("disable_quota_user1", 
mockUUidAppId()));
-    assertTrue(quotaManager.checkQuota("blank_user1", mockUUidAppId()));
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      Awaitility.await()
+          .timeout(5, TimeUnit.SECONDS)
+          .until(() -> applicationManager.getDefaultUserApps().size() > 2);
+
+      QuotaManager quotaManager = applicationManager.getQuotaManager();
+      Map<String, Map<String, Long>> currentUserAndApp = 
quotaManager.getCurrentUserAndApp();
+
+      currentUserAndApp.computeIfAbsent("user1", x -> mockUUidAppAndTime(30));
+      currentUserAndApp.computeIfAbsent("user2", x -> mockUUidAppAndTime(20));
+      currentUserAndApp.computeIfAbsent("user3", x -> mockUUidAppAndTime(29));
+      currentUserAndApp.computeIfAbsent("disable_quota_user1", x -> 
mockUUidAppAndTime(100));
+      currentUserAndApp.computeIfAbsent("blank_user1", x -> 
mockUUidAppAndTime(0));
+
+      assertEquals(currentUserAndApp.get("user1").size(), 30);
+      assertEquals(currentUserAndApp.get("user2").size(), 20);
+      assertEquals(currentUserAndApp.get("user3").size(), 29);
+      assertEquals(currentUserAndApp.get("disable_quota_user1").size(), 100);
+      assertEquals(currentUserAndApp.get("blank_user1").size(), 0);
+
+      assertTrue(quotaManager.checkQuota("user1", mockUUidAppId()));
+      assertTrue(quotaManager.checkQuota("user2", mockUUidAppId()));
+      assertFalse(quotaManager.checkQuota("user3", mockUUidAppId()));
+      assertTrue(quotaManager.checkQuota("user3", mockUUidAppId()));
+      assertFalse(quotaManager.checkQuota("disable_quota_user1", 
mockUUidAppId()));
+      assertTrue(quotaManager.checkQuota("blank_user1", mockUUidAppId()));
+    }
   }
 
   private String mockUUidAppId() {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
index bec781320..350982c6e 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
@@ -53,53 +53,54 @@ public class AccessManagerTest {
     // test init
     CoordinatorConf conf = new CoordinatorConf();
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      String expectedMessage = "Empty classes";
-      assertTrue(e.getMessage().startsWith(expectedMessage));
-    }
-    conf.setString(
-        CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-        
"com.Dummy,org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessChecker");
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
-      assertTrue(e.getMessage().startsWith(expectedMessage));
-    }
-    // test empty checkers
-    conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
-    AccessManager accessManager =
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      try {
         new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
-    assertTrue(
-        accessManager
-            .handleAccessRequest(
-                new AccessInfo(
-                    String.valueOf(new Random().nextInt()),
-                    Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
-                    Collections.emptyMap(),
-                    "user"))
-            .isSuccess());
-    accessManager.close();
-    // test mock checkers
-    String alwaysTrueClassName = MockAccessCheckerAlwaysTrue.class.getName();
-    conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), 
alwaysTrueClassName + ",");
-    accessManager =
+      } catch (RuntimeException e) {
+        String expectedMessage = "Empty classes";
+        assertTrue(e.getMessage().startsWith(expectedMessage));
+      }
+      conf.setString(
+          CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
+          
"com.Dummy,org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessChecker");
+      try {
         new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
-    assertEquals(1, accessManager.getAccessCheckers().size());
-    assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
-    assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock2")).isSuccess());
-    String alwaysFalseClassName = MockAccessCheckerAlwaysFalse.class.getName();
-    conf.setString(
-        CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-        alwaysTrueClassName + "," + alwaysFalseClassName);
-    accessManager =
-        new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
-    assertEquals(2, accessManager.getAccessCheckers().size());
-    assertFalse(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
-    accessManager.close();
+      } catch (RuntimeException e) {
+        String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
+        assertTrue(e.getMessage().startsWith(expectedMessage));
+      }
+      // test empty checkers
+      conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
+      AccessManager accessManager =
+          new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      assertTrue(
+          accessManager
+              .handleAccessRequest(
+                  new AccessInfo(
+                      String.valueOf(new Random().nextInt()),
+                      Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+                      Collections.emptyMap(),
+                      "user"))
+              .isSuccess());
+      accessManager.close();
+      // test mock checkers
+      String alwaysTrueClassName = MockAccessCheckerAlwaysTrue.class.getName();
+      conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), 
alwaysTrueClassName + ",");
+      accessManager =
+          new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      assertEquals(1, accessManager.getAccessCheckers().size());
+      assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
+      assertTrue(accessManager.handleAccessRequest(new 
AccessInfo("mock2")).isSuccess());
+      String alwaysFalseClassName = 
MockAccessCheckerAlwaysFalse.class.getName();
+      conf.setString(
+          CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
+          alwaysTrueClassName + "," + alwaysFalseClassName);
+      accessManager =
+          new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      assertEquals(2, accessManager.getAccessCheckers().size());
+      assertFalse(accessManager.handleAccessRequest(new 
AccessInfo("mock1")).isSuccess());
+      accessManager.close();
+    }
   }
 
   public static class MockAccessCheckerAlwaysTrue extends 
AbstractAccessChecker {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
index 86bde900e..2b8ba8d55 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
@@ -66,86 +66,87 @@ public class AccessCandidatesCheckerTest {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
tempDir.toURI().toString());
     String checkerClassName = AccessCandidatesChecker.class.getName();
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), 
checkerClassName);
-    final ApplicationManager applicationManager = new ApplicationManager(conf);
-    // file load checking at startup
-    Exception expectedException = null;
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      expectedException = e;
-    }
-    assertNotNull(expectedException);
-    assertTrue(
-        expectedException
-            .getMessage()
-            .contains(
-                "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
-    conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
cfgFile.toURI().toString());
-    expectedException = null;
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      expectedException = e;
-    }
-    assertNotNull(expectedException);
-    assertTrue(
-        expectedException
-            .getMessage()
-            .contains(
-                "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
-
-    // load the config at the beginning
-    FileWriter fileWriter = new FileWriter(cfgFile);
-    PrintWriter printWriter = new PrintWriter(fileWriter);
-    printWriter.println("9527");
-    printWriter.println(" 135 ");
-    printWriter.println("2 ");
-    printWriter.flush();
-    printWriter.close();
-    AccessManager accessManager =
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      // file load checking at startup
+      Exception expectedException = null;
+      try {
         new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
-    AccessCandidatesChecker checker =
-        (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
-    sleep(1200);
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      } catch (RuntimeException e) {
+        expectedException = e;
+      }
+      assertNotNull(expectedException);
+      assertTrue(
+          expectedException
+              .getMessage()
+              .contains(
+                  "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+      conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
cfgFile.toURI().toString());
+      expectedException = null;
+      try {
+        new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      } catch (RuntimeException e) {
+        expectedException = e;
+      }
+      assertNotNull(expectedException);
+      assertTrue(
+          expectedException
+              .getMessage()
+              .contains(
+                  "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+
+      // load the config at the beginning
+      FileWriter fileWriter = new FileWriter(cfgFile);
+      PrintWriter printWriter = new PrintWriter(fileWriter);
+      printWriter.println("9527");
+      printWriter.println(" 135 ");
+      printWriter.println("2 ");
+      printWriter.flush();
+      printWriter.close();
+      AccessManager accessManager =
+          new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      AccessCandidatesChecker checker =
+          (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
+      sleep(1200);
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // ignore empty or wrong content
-    printWriter.println("");
-    printWriter.flush();
-    printWriter.close();
-    sleep(1300);
-    assertTrue(cfgFile.exists());
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      // ignore empty or wrong content
+      printWriter.println("");
+      printWriter.flush();
+      printWriter.close();
+      sleep(1300);
+      assertTrue(cfgFile.exists());
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // the config will not be changed when the conf file is deleted
-    assertTrue(cfgFile.delete());
-    sleep(1200);
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      // the config will not be changed when the conf file is deleted
+      assertTrue(cfgFile.delete());
+      sleep(1200);
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // the normal update config process, move the new conf file to the old one
-    File cfgFileTmp = new File(cfgFileName + ".tmp");
-    fileWriter = new FileWriter(cfgFileTmp);
-    printWriter = new PrintWriter(fileWriter);
-    printWriter.println("13");
-    printWriter.println("57");
-    printWriter.close();
-    FileUtils.moveFile(cfgFileTmp, cfgFile);
-    sleep(1200);
-    assertEquals(Sets.newHashSet("13", "57"), checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("13")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("57")).isSuccess());
-    checker.close();
+      // the normal update config process, move the new conf file to the old 
one
+      File cfgFileTmp = new File(cfgFileName + ".tmp");
+      fileWriter = new FileWriter(cfgFileTmp);
+      printWriter = new PrintWriter(fileWriter);
+      printWriter.println("13");
+      printWriter.println("57");
+      printWriter.close();
+      FileUtils.moveFile(cfgFileTmp, cfgFile);
+      sleep(1200);
+      assertEquals(Sets.newHashSet("13", "57"), checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("13")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("57")).isSuccess());
+      checker.close();
+    }
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
index a2e697e73..de6a2d420 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
@@ -80,46 +80,47 @@ public class AccessClusterLoadCheckerTest {
     conf.set(COORDINATOR_ACCESS_CHECKERS, 
Collections.singletonList(clusterLoaderCheckerName));
     conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
     conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    AccessManager accessManager =
-        new AccessManager(
-            conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
-
-    AccessClusterLoadChecker accessClusterLoadChecker =
-        (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
-
-    /**
-     * case1: when setting the invalid required shuffle nodes number of job 
and available servers
-     * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
-     */
-    Map<String, String> properties = new HashMap<>();
-    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
-    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
-    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
-    /**
-     * case2: when setting the valid required shuffle nodes number of job and 
available servers
-     * greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
-     */
-    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
-    assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
-    /**
-     * case3: when setting the valid required shuffle nodes number of job and 
available servers less
-     * than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
-     */
-    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
-    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
-    /**
-     * case4: when the required shuffle nodes number is not specified in 
access info, it should use
-     * the default shuffle nodes max from coordinator conf.
-     */
-    properties = new HashMap<>();
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
-    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      AccessManager accessManager =
+          new AccessManager(
+              conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
+
+      AccessClusterLoadChecker accessClusterLoadChecker =
+          (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
+
+      /**
+       * case1: when setting the invalid required shuffle nodes number of job 
and available servers
+       * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+       */
+      Map<String, String> properties = new HashMap<>();
+      properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
+      AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
+      assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+      /**
+       * case2: when setting the valid required shuffle nodes number of job 
and available servers
+       * greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
+       */
+      properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
+      accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+      assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+      /**
+       * case3: when setting the valid required shuffle nodes number of job 
and available servers
+       * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+       */
+      properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+      accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+      assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+      /**
+       * case4: when the required shuffle nodes number is not specified in 
access info, it should
+       * use the default shuffle nodes max from coordinator conf.
+       */
+      properties = new HashMap<>();
+      accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+      assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+    }
   }
 
   @Test
@@ -133,23 +134,24 @@ public class AccessClusterLoadCheckerTest {
             .getFile();
     CoordinatorConf conf = new CoordinatorConf(filePath);
     conf.setString(COORDINATOR_ACCESS_CHECKERS.key(), 
clusterLoaderCheckerName);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    AccessManager accessManager =
-        new AccessManager(
-            conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
-    AccessClusterLoadChecker accessClusterLoadChecker =
-        (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
-    when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
-    assertFalse(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
-    assertEquals(2, accessClusterLoadChecker.getAvailableServerNumThreshold());
-    assertEquals(0, 
Double.compare(accessClusterLoadChecker.getMemoryPercentThreshold(), 20.0));
-    ServerNode node2 = new ServerNode("1", "1", 0, 90, 40, 10, 0, null);
-    serverNodeList.add(node2);
-    ServerNode node3 = new ServerNode("1", "1", 0, 80, 25, 20, 0, null);
-    serverNodeList.add(node3);
-    assertFalse(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
-    ServerNode node4 = new ServerNode("1", "1", 0, 75, 25, 25, 0, null);
-    serverNodeList.add(node4);
-    assertTrue(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      AccessManager accessManager =
+          new AccessManager(
+              conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
+      AccessClusterLoadChecker accessClusterLoadChecker =
+          (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
+      when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
+      assertFalse(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
+      assertEquals(2, 
accessClusterLoadChecker.getAvailableServerNumThreshold());
+      assertEquals(0, 
Double.compare(accessClusterLoadChecker.getMemoryPercentThreshold(), 20.0));
+      ServerNode node2 = new ServerNode("1", "1", 0, 90, 40, 10, 0, null);
+      serverNodeList.add(node2);
+      ServerNode node3 = new ServerNode("1", "1", 0, 80, 25, 20, 0, null);
+      serverNodeList.add(node3);
+      assertFalse(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
+      ServerNode node4 = new ServerNode("1", "1", 0, 75, 25, 25, 0, null);
+      serverNodeList.add(node4);
+      assertTrue(accessClusterLoadChecker.check(new 
AccessInfo("test")).isSuccess());
+    }
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
index 8d74c773e..5a2a23d11 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
@@ -76,37 +76,39 @@ public class AccessQuotaCheckerTest {
     conf.set(
         COORDINATOR_ACCESS_CHECKERS, 
Collections.singletonList(AccessQuotaChecker.class.getName()));
     conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    AccessManager accessManager =
-        new AccessManager(
-            conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
-
-    AccessQuotaChecker accessQuotaChecker =
-        (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+    Map<String, String> properties = new HashMap<>();
 
     /**
      * case1: when user set default app num is 5, and commit 6 app which 
current app num is greater
      * than default app num, it will reject 1 app and return false.
      */
-    Map<String, String> properties = new HashMap<>();
-    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
-    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
-    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
-    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
-    assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      AccessManager accessManager =
+          new AccessManager(
+              conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
+      AccessQuotaChecker accessQuotaChecker =
+          (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+      AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
+      assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+      assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+      assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+      assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+    }
 
     /**
      * case2: when setting the valid required shuffle nodes number of job and 
available servers
      * greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
      */
     conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 0);
-    applicationManager = new ApplicationManager(conf);
-    accessManager =
-        new AccessManager(
-            conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
-    accessQuotaChecker = (AccessQuotaChecker) 
accessManager.getAccessCheckers().get(0);
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
-    assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      AccessManager accessManager =
+          new AccessManager(
+              conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
+      AccessQuotaChecker accessQuotaChecker =
+          (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+      AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
+      assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+    }
 
     /**
      * case3: when setting two checkers and the valid required shuffle nodes 
number of job and
@@ -118,16 +120,18 @@ public class AccessQuotaCheckerTest {
         Arrays.asList(
             "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker",
             
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
-    applicationManager = new ApplicationManager(conf);
-    accessManager =
-        new AccessManager(
-            conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
-    accessQuotaChecker = (AccessQuotaChecker) 
accessManager.getAccessCheckers().get(0);
-    final AccessClusterLoadChecker accessClusterLoadChecker =
-        (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
-    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
-    accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
-    assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
-    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      AccessManager accessManager =
+          new AccessManager(
+              conf, clusterManager, applicationManager.getQuotaManager(), new 
Configuration());
+      AccessQuotaChecker accessQuotaChecker =
+          (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+      final AccessClusterLoadChecker accessClusterLoadChecker =
+          (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
+      properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+      AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), 
properties, "user");
+      assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+      assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+    }
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
index 8a117b86d..790cec23b 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
@@ -63,10 +63,15 @@ public class DynamicClientConfServiceTest {
 
     // file load checking at startup
     Exception expectedException = null;
+    DynamicClientConfService dynamicClientConfService = null;
     try {
-      new DynamicClientConfService(conf, new Configuration());
+      dynamicClientConfService = new DynamicClientConfService(conf, new 
Configuration());
     } catch (RuntimeException e) {
       expectedException = e;
+    } finally {
+      if (dynamicClientConfService != null) {
+        dynamicClientConfService.close();
+      }
     }
     assertNotNull(expectedException);
     assertTrue(expectedException.getMessage().endsWith("is not a file."));
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
index 95db4cb43..fd03f19d5 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.coordinator.strategy.assignment;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -29,8 +28,6 @@ import java.util.stream.Collectors;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.PartitionRange;
@@ -46,185 +43,202 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class BasicAssignmentStrategyTest {
 
   Set<String> tags = Sets.newHashSet("test");
-  private SimpleClusterManager clusterManager;
-  private BasicAssignmentStrategy strategy;
   private int shuffleNodesMax = 7;
 
-  @BeforeEach
-  public void setUp() throws Exception {
+  @Test
+  public void testAssign() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.set(
         CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
         AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    strategy = new BasicAssignmentStrategy(clusterManager, ssc);
-  }
-
-  @AfterEach
-  public void tearDown() throws IOException {
-    clusterManager.clear();
-    clusterManager.close();
-  }
-
-  @Test
-  public void testAssign() {
-    for (int i = 0; i < 20; ++i) {
-      clusterManager.add(
-          new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i, 
0, tags));
-    }
-
-    PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
-    assertEquals(10, assignments.size());
-
-    for (int i = 0; i < 100; i += 10) {
-      assertTrue(assignments.containsKey(new PartitionRange(i, i + 10)));
-    }
-
-    int i = 0;
-    Iterator<List<ServerNode>> ite = assignments.values().iterator();
-    while (ite.hasNext()) {
-      List<ServerNode> cur = ite.next();
-      assertEquals(2, cur.size());
-      assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(0).getId());
-      i++;
-      assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(1).getId());
-      i++;
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      BasicAssignmentStrategy strategy = new 
BasicAssignmentStrategy(clusterManager, ssc);
+
+      for (int i = 0; i < 20; ++i) {
+        clusterManager.add(
+            new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i, 
0, tags));
+      }
+
+      PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
+      assertEquals(10, assignments.size());
+
+      for (int i = 0; i < 100; i += 10) {
+        assertTrue(assignments.containsKey(new PartitionRange(i, i + 10)));
+      }
+
+      int i = 0;
+      Iterator<List<ServerNode>> ite = assignments.values().iterator();
+      while (ite.hasNext()) {
+        List<ServerNode> cur = ite.next();
+        assertEquals(2, cur.size());
+        assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(0).getId());
+        i++;
+        assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(1).getId());
+        i++;
+      }
     }
   }
 
   @Test
-  public void testRandomAssign() {
-    for (int i = 0; i < 20; ++i) {
-      clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 
0, 0, 0, 0, tags));
-    }
-    PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
-    Set<ServerNode> serverNodes1 = Sets.newHashSet();
-    for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
-      serverNodes1.addAll(assignment.getValue());
-    }
-
-    pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    assignments = pra.getAssignments();
-    Set<ServerNode> serverNodes2 = Sets.newHashSet();
-    for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
-      serverNodes2.addAll(assignment.getValue());
+  public void testRandomAssign() throws Exception {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.set(
+        CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+        AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+    ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      BasicAssignmentStrategy strategy = new 
BasicAssignmentStrategy(clusterManager, ssc);
+      for (int i = 0; i < 20; ++i) {
+        clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 
0, 0, 0, 0, 0, tags));
+      }
+      PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
+      Set<ServerNode> serverNodes1 = Sets.newHashSet();
+      for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
+        serverNodes1.addAll(assignment.getValue());
+      }
+
+      pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      assignments = pra.getAssignments();
+      Set<ServerNode> serverNodes2 = Sets.newHashSet();
+      for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
+        serverNodes2.addAll(assignment.getValue());
+      }
+
+      // test for the random node pick, there is a little possibility failed
+      assertFalse(serverNodes1.containsAll(serverNodes2));
     }
-
-    // test for the random node pick, there is a little possibility failed
-    assertFalse(serverNodes1.containsAll(serverNodes2));
   }
 
   @Test
-  public void testAssignWithDifferentNodeNum() {
-    final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0, 20, 0, tags);
-    final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0, 10, 0, tags);
-    final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0, 0, 0, tags);
-
-    clusterManager.add(sn1);
-    PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    // nodeNum < replica
-    assertNull(pra.getAssignments());
-
-    // nodeNum = replica
-    clusterManager.add(sn2);
-    pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
-    Set<ServerNode> serverNodes = Sets.newHashSet();
-    for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
-      serverNodes.addAll(assignment.getValue());
-    }
-    assertEquals(2, serverNodes.size());
-    assertTrue(serverNodes.contains(sn1));
-    assertTrue(serverNodes.contains(sn2));
-
-    // nodeNum > replica & nodeNum < shuffleNodesMax
-    clusterManager.add(sn3);
-    pra = strategy.assign(100, 10, 2, tags, -1, -1);
-    assignments = pra.getAssignments();
-    serverNodes = Sets.newHashSet();
-    for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
-      serverNodes.addAll(assignment.getValue());
+  public void testAssignWithDifferentNodeNum() throws Exception {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.set(
+        CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+        AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+    ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      BasicAssignmentStrategy strategy = new 
BasicAssignmentStrategy(clusterManager, ssc);
+
+      final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0, 20, 0, tags);
+      final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0, 10, 0, tags);
+      final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0, 0, 0, tags);
+
+      clusterManager.add(sn1);
+      PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      // nodeNum < replica
+      assertNull(pra.getAssignments());
+
+      // nodeNum = replica
+      clusterManager.add(sn2);
+      pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      SortedMap<PartitionRange, List<ServerNode>> assignments = 
pra.getAssignments();
+      Set<ServerNode> serverNodes = Sets.newHashSet();
+      for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
+        serverNodes.addAll(assignment.getValue());
+      }
+      assertEquals(2, serverNodes.size());
+      assertTrue(serverNodes.contains(sn1));
+      assertTrue(serverNodes.contains(sn2));
+
+      // nodeNum > replica & nodeNum < shuffleNodesMax
+      clusterManager.add(sn3);
+      pra = strategy.assign(100, 10, 2, tags, -1, -1);
+      assignments = pra.getAssignments();
+      serverNodes = Sets.newHashSet();
+      for (Map.Entry<PartitionRange, List<ServerNode>> assignment : 
assignments.entrySet()) {
+        serverNodes.addAll(assignment.getValue());
+      }
+      assertEquals(3, serverNodes.size());
+      assertTrue(serverNodes.contains(sn1));
+      assertTrue(serverNodes.contains(sn2));
+      assertTrue(serverNodes.contains(sn3));
     }
-    assertEquals(3, serverNodes.size());
-    assertTrue(serverNodes.contains(sn1));
-    assertTrue(serverNodes.contains(sn2));
-    assertTrue(serverNodes.contains(sn3));
   }
 
   @Test
-  public void testAssignmentShuffleNodesNum() {
-    Set<String> serverTags = Sets.newHashSet("tag-1");
-
-    for (int i = 0; i < 20; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
-
-    /**
-     * case1: user specify the illegal shuffle node num(<0) it will use the 
default shuffle nodes
-     * num when having enough servers.
-     */
-    PartitionRangeAssignment pra = strategy.assign(100, 10, 1, serverTags, -1, 
-1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case2: user specify the illegal shuffle node num(==0) it will use the 
default shuffle nodes
-     * num when having enough servers.
-     */
-    pra = strategy.assign(100, 10, 1, serverTags, 0, -1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case3: user specify the illegal shuffle node num(>default max 
limitation) it will use the
-     * default shuffle nodes num when having enough servers
-     */
-    pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax + 10, -1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case4: user specify the legal shuffle node num, it will use the 
customized shuffle nodes num
-     * when having enough servers
-     */
-    pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax - 1, -1);
-    assertEquals(
-        shuffleNodesMax - 1,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case5: user specify the legal shuffle node num, but cluster don't have 
enough servers, it
-     * will return the remaining servers.
-     */
-    serverTags = Sets.newHashSet("tag-2");
-    for (int i = 0; i < shuffleNodesMax - 1; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, 20 - i, 0, 
serverTags));
+  public void testAssignmentShuffleNodesNum() throws Exception {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.set(
+        CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+        AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+    ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      BasicAssignmentStrategy strategy = new 
BasicAssignmentStrategy(clusterManager, ssc);
+
+      Set<String> serverTags = Sets.newHashSet("tag-1");
+
+      for (int i = 0; i < 20; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+
+      /**
+       * case1: user specify the illegal shuffle node num(<0) it will use the 
default shuffle nodes
+       * num when having enough servers.
+       */
+      PartitionRangeAssignment pra = strategy.assign(100, 10, 1, serverTags, 
-1, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case2: user specify the illegal shuffle node num(==0) it will use the 
default shuffle nodes
+       * num when having enough servers.
+       */
+      pra = strategy.assign(100, 10, 1, serverTags, 0, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case3: user specify the illegal shuffle node num(>default max 
limitation) it will use the
+       * default shuffle nodes num when having enough servers
+       */
+      pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax + 10, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case4: user specify the legal shuffle node num, it will use the 
customized shuffle nodes
+       * num when having enough servers
+       */
+      pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax - 1, -1);
+      assertEquals(
+          shuffleNodesMax - 1,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case5: user specify the legal shuffle node num, but cluster don't 
have enough servers, it
+       * will return the remaining servers.
+       */
+      serverTags = Sets.newHashSet("tag-2");
+      for (int i = 0; i < shuffleNodesMax - 1; ++i) {
+        clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
+      assertEquals(
+          shuffleNodesMax - 1,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
     }
-    pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
-    assertEquals(
-        shuffleNodesMax - 1,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
   }
 
   @Test
@@ -234,31 +248,33 @@ public class BasicAssignmentStrategyTest {
         CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
         AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS);
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    strategy = new BasicAssignmentStrategy(clusterManager, ssc);
-    List<Long> list =
-        Lists.newArrayList(
-            20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 
20L, 20L, 20L, 20L,
-            20L, 20L, 20L);
-    updateServerResource(list);
-    PartitionRangeAssignment assignment = strategy.assign(100, 1, 2, tags, 5, 
20);
-    List<Long> expect = Lists.newArrayList(40L, 40L, 40L, 40L, 40L);
-    valid(expect, assignment.getAssignments());
-
-    assignment = strategy.assign(28, 1, 2, tags, 5, 20);
-    expect = Lists.newArrayList(11L, 12L, 12L, 11L, 10L);
-    valid(expect, assignment.getAssignments());
-
-    assignment = strategy.assign(29, 1, 2, tags, 5, 4);
-    expect = Lists.newArrayList(11L, 12L, 12L, 12L, 11L);
-    valid(expect, assignment.getAssignments());
-
-    assignment = strategy.assign(29, 2, 2, tags, 5, 4);
-    expect = Lists.newArrayList(12L, 12L, 12L, 12L, 12L);
-    valid(expect, assignment.getAssignments());
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      BasicAssignmentStrategy strategy = new 
BasicAssignmentStrategy(clusterManager, ssc);
+
+      List<Long> list =
+          Lists.newArrayList(
+              20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 
20L, 20L, 20L, 20L,
+              20L, 20L, 20L);
+      updateServerResource(clusterManager, list);
+      PartitionRangeAssignment assignment = strategy.assign(100, 1, 2, tags, 
5, 20);
+      List<Long> expect = Lists.newArrayList(40L, 40L, 40L, 40L, 40L);
+      valid(expect, assignment.getAssignments());
+
+      assignment = strategy.assign(28, 1, 2, tags, 5, 20);
+      expect = Lists.newArrayList(11L, 12L, 12L, 11L, 10L);
+      valid(expect, assignment.getAssignments());
+
+      assignment = strategy.assign(29, 1, 2, tags, 5, 4);
+      expect = Lists.newArrayList(11L, 12L, 12L, 12L, 11L);
+      valid(expect, assignment.getAssignments());
+
+      assignment = strategy.assign(29, 2, 2, tags, 5, 4);
+      expect = Lists.newArrayList(12L, 12L, 12L, 12L, 12L);
+      valid(expect, assignment.getAssignments());
+    }
   }
 
-  void updateServerResource(List<Long> resources) {
+  void updateServerResource(SimpleClusterManager clusterManager, List<Long> 
resources) {
     for (int i = 0; i < resources.size(); i++) {
       ServerNode node =
           new ServerNode(
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index 2d80c65a4..60d6cb873 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.coordinator.strategy.assignment;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -31,8 +30,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -46,149 +43,150 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class PartitionBalanceAssignmentStrategyTest {
 
-  private SimpleClusterManager clusterManager;
-  private PartitionBalanceAssignmentStrategy strategy;
   private int shuffleNodesMax = 5;
   private Set<String> tags = Sets.newHashSet("test");
 
-  @BeforeEach
-  public void setUp() throws Exception {
+  @Test
+  public void testAssign() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.set(
         CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
         AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
-  }
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      PartitionBalanceAssignmentStrategy strategy =
+          new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
 
-  @Test
-  public void testAssign() {
-    List<Long> list = Lists.newArrayList();
-    for (int i = 0; i < 20; i++) {
-      list.add(10L);
-    }
-    updateServerResource(list);
-    boolean isThrown = false;
-    try {
-      strategy.assign(100, 2, 1, tags, -1, -1);
-    } catch (Exception e) {
-      isThrown = true;
-    }
-    assertTrue(isThrown);
-    try {
-      strategy.assign(0, 1, 1, tags, -1, -1);
-    } catch (Exception e) {
-      fail();
-    }
-    isThrown = false;
-    try {
-      strategy.assign(10, 1, 1, Sets.newHashSet("fake"), 1, -1);
-    } catch (Exception e) {
-      isThrown = true;
-    }
-    assertTrue(isThrown);
-    strategy.assign(100, 1, 1, tags, -1, -1);
-    List<Long> expect =
-        Lists.newArrayList(
-            20L, 20L, 20L, 20L, 20L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L);
-    valid(expect);
-    strategy.assign(75, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L,
-            0L);
-    valid(expect);
-    strategy.assign(100, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 20L, 20L, 20L, 
20L, 20L, 0L, 0L, 0L,
-            0L, 0L);
-    valid(expect);
-
-    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    list =
-        Lists.newArrayList(
-            7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 17L, 
8L, 1L, 3L, 3L, 6L,
-            12L);
-    updateServerResource(list);
-    strategy.assign(100, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L, 
0L, 0L, 0L, 0L, 0L);
-    valid(expect);
-    strategy.assign(50, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 20L, 0L, 0L, 20L, 10L, 10L, 0L, 20L, 0L, 10L, 20L, 10L, 20L, 
0L, 0L, 0L, 0L, 0L,
-            10L);
-    valid(expect);
-
-    strategy.assign(75, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 20L, 0L, 0L, 20L, 25L, 10L, 15L, 20L, 15L, 25L, 20L, 25L, 20L, 
0L, 0L, 0L, 0L, 0L,
-            10L);
-    valid(expect);
-
-    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    list =
-        Lists.newArrayList(
-            7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 17L, 
8L, 1L, 3L, 3L, 6L,
-            12L);
-    updateServerResource(list);
-    strategy.assign(50, 1, 2, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L, 
0L, 0L, 0L, 0L, 0L);
-    valid(expect);
-    strategy.assign(75, 1, 2, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 20L, 0L, 0L, 50L, 30L, 0L, 0L, 20L, 0L, 30L, 20L, 30L, 20L, 
0L, 0L, 0L, 0L, 0L,
-            30L);
-    valid(expect);
-    strategy.assign(33, 1, 2, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 33L, 0L, 0L, 50L, 30L, 14L, 13L, 20L, 13L, 30L, 20L, 30L, 20L, 
13L, 0L, 0L, 0L, 0L,
-            30L);
-    valid(expect);
-
-    list = Lists.newArrayList();
-    for (int i = 0; i < 20; i++) {
-      if (i % 2 == 0) {
+      List<Long> list = Lists.newArrayList();
+      for (int i = 0; i < 20; i++) {
         list.add(10L);
-      } else {
-        list.add(20L);
       }
-    }
+      updateServerResource(clusterManager, list);
+      boolean isThrown = false;
+      try {
+        strategy.assign(100, 2, 1, tags, -1, -1);
+      } catch (Exception e) {
+        isThrown = true;
+      }
+      assertTrue(isThrown);
+      try {
+        strategy.assign(0, 1, 1, tags, -1, -1);
+      } catch (Exception e) {
+        fail();
+      }
+      isThrown = false;
+      try {
+        strategy.assign(10, 1, 1, Sets.newHashSet("fake"), 1, -1);
+      } catch (Exception e) {
+        isThrown = true;
+      }
+      assertTrue(isThrown);
+      strategy.assign(100, 1, 1, tags, -1, -1);
+      List<Long> expect =
+          Lists.newArrayList(
+              20L, 20L, 20L, 20L, 20L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(75, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L, 0L,
+              0L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(100, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 20L, 20L, 20L, 
20L, 20L, 0L, 0L, 0L,
+              0L, 0L);
+      valid(clusterManager, strategy, expect);
+
+      Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+      list =
+          Lists.newArrayList(
+              7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 
17L, 8L, 1L, 3L, 3L, 6L,
+              12L);
+      updateServerResource(clusterManager, list);
+      strategy.assign(100, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L, 
0L, 0L, 0L, 0L, 0L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(50, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 20L, 0L, 0L, 20L, 10L, 10L, 0L, 20L, 0L, 10L, 20L, 10L, 20L, 
0L, 0L, 0L, 0L, 0L,
+              10L);
+      valid(clusterManager, strategy, expect);
+
+      strategy.assign(75, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 20L, 0L, 0L, 20L, 25L, 10L, 15L, 20L, 15L, 25L, 20L, 25L, 
20L, 0L, 0L, 0L, 0L, 0L,
+              10L);
+      valid(clusterManager, strategy, expect);
+
+      Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+      list =
+          Lists.newArrayList(
+              7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 
17L, 8L, 1L, 3L, 3L, 6L,
+              12L);
+      updateServerResource(clusterManager, list);
+      strategy.assign(50, 1, 2, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L, 
0L, 0L, 0L, 0L, 0L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(75, 1, 2, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 20L, 0L, 0L, 50L, 30L, 0L, 0L, 20L, 0L, 30L, 20L, 30L, 20L, 
0L, 0L, 0L, 0L, 0L,
+              30L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(33, 1, 2, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 33L, 0L, 0L, 50L, 30L, 14L, 13L, 20L, 13L, 30L, 20L, 30L, 
20L, 13L, 0L, 0L, 0L,
+              0L, 30L);
+      valid(clusterManager, strategy, expect);
+
+      list = Lists.newArrayList();
+      for (int i = 0; i < 20; i++) {
+        if (i % 2 == 0) {
+          list.add(10L);
+        } else {
+          list.add(20L);
+        }
+      }
 
-    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    updateServerResource(list);
-    strategy.assign(33, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L);
-    valid(expect);
-    strategy.assign(41, 1, 2, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 17L, 0L, 17L, 0L, 16L, 
0L, 16L, 0L, 16L);
-    valid(expect);
-    strategy.assign(23, 1, 1, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            5L, 7L, 5L, 7L, 5L, 7L, 4L, 6L, 4L, 6L, 0L, 17L, 0L, 17L, 0L, 16L, 
0L, 16L, 0L, 16L);
-    valid(expect);
-    strategy.assign(11, 1, 3, tags, -1, -1);
-    expect =
-        Lists.newArrayList(
-            5L, 7L, 5L, 7L, 5L, 7L, 4L, 13L, 4L, 13L, 7L, 17L, 6L, 17L, 6L, 
16L, 0L, 16L, 0L, 16L);
-    valid(expect);
+      Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+      updateServerResource(clusterManager, list);
+      strategy.assign(33, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(41, 1, 2, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 17L, 0L, 17L, 0L, 
16L, 0L, 16L, 0L, 16L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(23, 1, 1, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              5L, 7L, 5L, 7L, 5L, 7L, 4L, 6L, 4L, 6L, 0L, 17L, 0L, 17L, 0L, 
16L, 0L, 16L, 0L, 16L);
+      valid(clusterManager, strategy, expect);
+      strategy.assign(11, 1, 3, tags, -1, -1);
+      expect =
+          Lists.newArrayList(
+              5L, 7L, 5L, 7L, 5L, 7L, 4L, 13L, 4L, 13L, 7L, 17L, 6L, 17L, 6L, 
16L, 0L, 16L, 0L,
+              16L);
+      valid(clusterManager, strategy, expect);
+    }
   }
 
-  private void valid(List<Long> expect) {
+  private void valid(
+      SimpleClusterManager clusterManager,
+      PartitionBalanceAssignmentStrategy strategy,
+      List<Long> expect) {
     assertEquals(20, expect.size());
     int i = 0;
     List<ServerNode> list = clusterManager.getServerList(tags);
@@ -206,13 +204,7 @@ public class PartitionBalanceAssignmentStrategyTest {
     }
   }
 
-  @AfterEach
-  public void tearDown() throws IOException {
-    clusterManager.clear();
-    clusterManager.close();
-  }
-
-  void updateServerResource(List<Long> resources) {
+  void updateServerResource(SimpleClusterManager clusterManager, List<Long> 
resources) {
     for (int i = 0; i < 20; i++) {
       ServerNode node =
           new ServerNode(
@@ -229,76 +221,88 @@ public class PartitionBalanceAssignmentStrategyTest {
   }
 
   @Test
-  public void testAssignmentShuffleNodesNum() {
-    Set<String> serverTags = Sets.newHashSet("tag-1");
+  public void testAssignmentShuffleNodesNum() throws Exception {
+    CoordinatorConf ssc = new CoordinatorConf();
+    ssc.set(
+        CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+        AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+    ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      PartitionBalanceAssignmentStrategy strategy =
+          new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
 
-    for (int i = 0; i < 20; ++i) {
-      clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
-    }
+      Set<String> serverTags = Sets.newHashSet("tag-1");
+
+      for (int i = 0; i < 20; ++i) {
+        clusterManager.add(
+            new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
 
-    /**
-     * case1: user specify the illegal shuffle node num(<0) it will use the 
default shuffle nodes
-     * num when having enough servers.
-     */
-    PartitionRangeAssignment pra = strategy.assign(100, 1, 1, serverTags, -1, 
-1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case2: user specify the illegal shuffle node num(==0) it will use the 
default shuffle nodes
-     * num when having enough servers.
-     */
-    pra = strategy.assign(100, 1, 1, serverTags, 0, -1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case3: user specify the illegal shuffle node num(>default max 
limitation) it will use the
-     * default shuffle nodes num when having enough servers
-     */
-    pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax + 10, -1);
-    assertEquals(
-        shuffleNodesMax,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case4: user specify the legal shuffle node num, it will use the 
customized shuffle nodes num
-     * when having enough servers
-     */
-    pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax - 1, -1);
-    assertEquals(
-        shuffleNodesMax - 1,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
-
-    /**
-     * case5: user specify the legal shuffle node num, but cluster don't have 
enough servers, it
-     * will return the remaining servers.
-     */
-    serverTags = Sets.newHashSet("tag-2");
-    for (int i = 0; i < shuffleNodesMax - 1; ++i) {
-      clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20 
- i, 0, serverTags));
+      /**
+       * case1: user specify the illegal shuffle node num(<0) it will use the 
default shuffle nodes
+       * num when having enough servers.
+       */
+      PartitionRangeAssignment pra = strategy.assign(100, 1, 1, serverTags, 
-1, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case2: user specify the illegal shuffle node num(==0) it will use the 
default shuffle nodes
+       * num when having enough servers.
+       */
+      pra = strategy.assign(100, 1, 1, serverTags, 0, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case3: user specify the illegal shuffle node num(>default max 
limitation) it will use the
+       * default shuffle nodes num when having enough servers
+       */
+      pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax + 10, -1);
+      assertEquals(
+          shuffleNodesMax,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case4: user specify the legal shuffle node num, it will use the 
customized shuffle nodes
+       * num when having enough servers
+       */
+      pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax - 1, -1);
+      assertEquals(
+          shuffleNodesMax - 1,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
+
+      /**
+       * case5: user specify the legal shuffle node num, but cluster don't 
have enough servers, it
+       * will return the remaining servers.
+       */
+      serverTags = Sets.newHashSet("tag-2");
+      for (int i = 0; i < shuffleNodesMax - 1; ++i) {
+        clusterManager.add(
+            new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0, 
serverTags));
+      }
+      pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
+      assertEquals(
+          shuffleNodesMax - 1,
+          pra.getAssignments().values().stream()
+              .flatMap(Collection::stream)
+              .collect(Collectors.toSet())
+              .size());
     }
-    pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
-    assertEquals(
-        shuffleNodesMax - 1,
-        pra.getAssignments().values().stream()
-            .flatMap(Collection::stream)
-            .collect(Collectors.toSet())
-            .size());
   }
 
   @Test
@@ -446,31 +450,33 @@ public class PartitionBalanceAssignmentStrategyTest {
         CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
         AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS);
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc, new Configuration());
-    strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
-    List<Long> list =
-        Lists.newArrayList(
-            20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 
20L, 20L, 20L, 20L,
-            20L, 20L, 20L);
-    updateServerResource(list);
-    strategy.assign(100, 1, 2, tags, 5, 20);
-    List<Long> expect =
-        Lists.newArrayList(
-            40L, 40L, 40L, 40L, 40L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L);
-    valid(expect);
-
-    strategy.assign(28, 1, 2, tags, 5, 20);
-    expect =
-        Lists.newArrayList(
-            40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L,
-            0L);
-    valid(expect);
-
-    strategy.assign(29, 1, 2, tags, 5, 4);
-    expect =
-        Lists.newArrayList(
-            40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 11L, 12L, 12L, 
12L, 11L, 0L, 0L, 0L,
-            0L, 0L);
-    valid(expect);
+    try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, 
new Configuration())) {
+      PartitionBalanceAssignmentStrategy strategy =
+          new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+      List<Long> list =
+          Lists.newArrayList(
+              20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 
20L, 20L, 20L, 20L,
+              20L, 20L, 20L);
+      updateServerResource(clusterManager, list);
+      strategy.assign(100, 1, 2, tags, 5, 20);
+      List<Long> expect =
+          Lists.newArrayList(
+              40L, 40L, 40L, 40L, 40L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L);
+      valid(clusterManager, strategy, expect);
+
+      strategy.assign(28, 1, 2, tags, 5, 20);
+      expect =
+          Lists.newArrayList(
+              40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 0L, 0L, 0L, 
0L, 0L, 0L, 0L, 0L, 0L,
+              0L);
+      valid(clusterManager, strategy, expect);
+
+      strategy.assign(29, 1, 2, tags, 5, 4);
+      expect =
+          Lists.newArrayList(
+              40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 11L, 12L, 12L, 
12L, 11L, 0L, 0L, 0L,
+              0L, 0L);
+      valid(clusterManager, strategy, expect);
+    }
   }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index ba0a456a0..cd2592919 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,6 +68,11 @@ public class AppBalanceSelectStorageStrategyTest {
     applicationManager.closeDetectStorageScheduler();
   }
 
+  @AfterEach
+  public void tearDown() {
+    applicationManager.close();
+  }
+
   @Test
   public void selectStorageTest() throws Exception {
     String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + 
remotePath2;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 770086b2d..642eec22c 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
 
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -72,6 +73,11 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     applicationManager.closeDetectStorageScheduler();
   }
 
+  @AfterEach
+  public void tearDown() {
+    applicationManager.close();
+  }
+
   @Test
   public void selectStorageTest() throws Exception {
 
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
index 53df37d9d..7f2a2b0f4 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.coordinator.web;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * Simulate Java process execution for testing purposes. This method can truly 
simulate the
@@ -63,6 +66,8 @@ public class UniffleJavaProcess {
       process.destroy();
       process.waitFor();
       process.exitValue();
+      // Wait for a while to ensure the port is released
+      Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
     }
   }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
index 05f0a8f21..ab9e931c0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
@@ -70,90 +70,91 @@ public class AccessCandidatesCheckerHadoopTest extends 
HadoopTestBase {
     conf.setString(
         CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker");
-    ApplicationManager applicationManager = new ApplicationManager(conf);
-    // file load checking at startup
-    Exception expectedException = null;
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      expectedException = e;
-    }
-    assertNotNull(expectedException);
-    assertTrue(
-        expectedException
-            .getMessage()
-            .contains(
-                "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
-    conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
candidatesFile);
-    expectedException = null;
-    try {
-      new AccessManager(conf, null, applicationManager.getQuotaManager(), new 
Configuration());
-    } catch (RuntimeException e) {
-      expectedException = e;
-    }
-    assertNotNull(expectedException);
-    assertTrue(
-        expectedException
-            .getMessage()
-            .contains(
-                "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+    try (ApplicationManager applicationManager = new ApplicationManager(conf)) 
{
+      // file load checking at startup
+      Exception expectedException = null;
+      try {
+        new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      } catch (RuntimeException e) {
+        expectedException = e;
+      }
+      assertNotNull(expectedException);
+      assertTrue(
+          expectedException
+              .getMessage()
+              .contains(
+                  "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+      conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, 
candidatesFile);
+      expectedException = null;
+      try {
+        new AccessManager(conf, null, applicationManager.getQuotaManager(), 
new Configuration());
+      } catch (RuntimeException e) {
+        expectedException = e;
+      }
+      assertNotNull(expectedException);
+      assertTrue(
+          expectedException
+              .getMessage()
+              .contains(
+                  "NoSuchMethodException: 
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
 
-    Path path = new Path(candidatesFile);
-    FSDataOutputStream out = fs.create(path);
+      Path path = new Path(candidatesFile);
+      FSDataOutputStream out = fs.create(path);
 
-    PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
-    printWriter.println("9527");
-    printWriter.println(" 135 ");
-    printWriter.println("2 ");
-    printWriter.flush();
-    printWriter.close();
-    AccessManager accessManager =
-        new AccessManager(conf, null, applicationManager.getQuotaManager(), 
hadoopConf);
-    AccessCandidatesChecker checker =
-        (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
-    // load the config at the beginning
-    sleep(1200);
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
+      printWriter.println("9527");
+      printWriter.println(" 135 ");
+      printWriter.println("2 ");
+      printWriter.flush();
+      printWriter.close();
+      AccessManager accessManager =
+          new AccessManager(conf, null, applicationManager.getQuotaManager(), 
hadoopConf);
+      AccessCandidatesChecker checker =
+          (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
+      // load the config at the beginning
+      sleep(1200);
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // ignore empty or wrong content
-    printWriter.println("");
-    printWriter.flush();
-    printWriter.close();
-    sleep(1300);
-    assertTrue(fs.exists(path));
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      // ignore empty or wrong content
+      printWriter.println("");
+      printWriter.flush();
+      printWriter.close();
+      sleep(1300);
+      assertTrue(fs.exists(path));
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // the config will not be changed when the conf file is deleted
-    fs.delete(path, true);
-    assertFalse(fs.exists(path));
-    sleep(1200);
-    assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("135")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1")).isSuccess());
-    assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+      // the config will not be changed when the conf file is deleted
+      fs.delete(path, true);
+      assertFalse(fs.exists(path));
+      sleep(1200);
+      assertEquals(Sets.newHashSet("2", "9527", "135"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+      assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
 
-    // the normal update config process, move the new conf file to the old one
-    Path tmpPath = new Path(candidatesFile + ".tmp");
-    out = fs.create(tmpPath);
-    printWriter = new PrintWriter(new OutputStreamWriter(out));
-    printWriter.println("9527");
-    printWriter.println(" 1357 ");
-    printWriter.flush();
-    printWriter.close();
-    fs.rename(tmpPath, path);
-    sleep(1200);
-    assertEquals(Sets.newHashSet("1357", "9527"), 
checker.getCandidates().get());
-    assertTrue(checker.check(new AccessInfo("1357")).isSuccess());
-    assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
-    checker.close();
+      // the normal update config process, move the new conf file to the old 
one
+      Path tmpPath = new Path(candidatesFile + ".tmp");
+      out = fs.create(tmpPath);
+      printWriter = new PrintWriter(new OutputStreamWriter(out));
+      printWriter.println("9527");
+      printWriter.println(" 1357 ");
+      printWriter.flush();
+      printWriter.close();
+      fs.rename(tmpPath, path);
+      sleep(1200);
+      assertEquals(Sets.newHashSet("1357", "9527"), 
checker.getCandidates().get());
+      assertTrue(checker.check(new AccessInfo("1357")).isSuccess());
+      assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+      checker.close();
+    }
   }
 }

Reply via email to