This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 417674d77 [#1675][FOLLOWUP] fix(test): Explicitly close resources to
avoid unexcepted behaviors (#1740)
417674d77 is described below
commit 417674d779cc2654f7f0adca2c0a53a0da7f5b4d
Author: RickyMa <[email protected]>
AuthorDate: Tue May 28 13:57:51 2024 +0800
[#1675][FOLLOWUP] fix(test): Explicitly close resources to avoid unexcepted
behaviors (#1740)
### What changes were proposed in this pull request?
Fix various flaky tests:
1. After using `ApplicationManager`, release resources at once, which will
let `ScheduledExecutorService` work background to cause unexpected behaviors.
2. Release `DynamicClientConfService` after using it.
3. Release `SimpleClusterManager` everytime after using it. Since the
execution order of JUnit test methods is random, unless we specifically set the
execution order. So it is better not to initialize the `SimpleClusterManager`
object globally.
4. Wait for a while to ensure the port is released in `UniffleJavaProcess`.
5. Always call `stopServer` after `new CoordinatorServer()` to shut down
multiple `ExecutorService` from running in background, which could cause
unexpected behaviors.
### Why are the changes needed?
For https://github.com/apache/incubator-uniffle/issues/1675.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unnecessary.
---
.../coordinator/ApplicationManagerTest.java | 6 +
.../uniffle/coordinator/CoordinatorServerTest.java | 7 +
.../uniffle/coordinator/QuotaManagerTest.java | 217 +++++-----
.../coordinator/access/AccessManagerTest.java | 91 ++--
.../checker/AccessCandidatesCheckerTest.java | 155 +++----
.../checker/AccessClusterLoadCheckerTest.java | 118 +++---
.../checker/AccessQuotaCheckerTest.java | 66 +--
.../conf/DynamicClientConfServiceTest.java | 7 +-
.../assignment/BasicAssignmentStrategyTest.java | 386 +++++++++--------
.../PartitionBalanceAssignmentStrategyTest.java | 470 +++++++++++----------
.../AppBalanceSelectStorageStrategyTest.java | 6 +
...owestIOSampleCostSelectStorageStrategyTest.java | 6 +
.../coordinator/web/UniffleJavaProcess.java | 5 +
.../test/AccessCandidatesCheckerHadoopTest.java | 161 +++----
14 files changed, 886 insertions(+), 815 deletions(-)
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 0a576eb3a..a307119e4 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -22,6 +22,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -60,6 +61,11 @@ public class ApplicationManagerTest {
applicationManager = new ApplicationManager(conf);
}
+ @AfterEach
+ public void tearDown() {
+ applicationManager.close();
+ }
+
@Test
public void refreshTest() {
String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR +
remotePath2;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
index 59e080a48..ef151207e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorServerTest.java
@@ -45,6 +45,9 @@ public class CoordinatorServerTest {
} catch (Exception e) {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
+ } finally {
+ // Always call stopServer after new CoordinatorServer to shut down
ExecutorService
+ cs2.stopServer();
}
coordinatorConf.setInteger("rss.jetty.http.port", 9529);
@@ -55,6 +58,10 @@ public class CoordinatorServerTest {
} catch (Exception e) {
assertEquals(expectMessage, e.getMessage());
assertEquals(expectStatus, ((ExitException) e).getStatus());
+ } finally {
+ // Always call stopServer after new CoordinatorServer to shut down
ExecutorService
+ cs2.stopServer();
+ cs1.stopServer();
}
final Thread t =
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index b7ce09177..41be903c2 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -61,17 +61,18 @@ public class QuotaManagerTest {
public void testDetectUserResource() {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
- ApplicationManager applicationManager = new ApplicationManager(conf);
- Awaitility.await()
- .timeout(5, TimeUnit.SECONDS)
- .until(() -> applicationManager.getDefaultUserApps().size() > 2);
-
- Integer user1 = applicationManager.getDefaultUserApps().get("user1");
- Integer user2 = applicationManager.getDefaultUserApps().get("user2");
- Integer user3 = applicationManager.getDefaultUserApps().get("user3");
- assertEquals(user1, 10);
- assertEquals(user2, 20);
- assertEquals(user3, 30);
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ Awaitility.await()
+ .timeout(5, TimeUnit.SECONDS)
+ .until(() -> applicationManager.getDefaultUserApps().size() > 2);
+
+ Integer user1 = applicationManager.getDefaultUserApps().get("user1");
+ Integer user2 = applicationManager.getDefaultUserApps().get("user2");
+ Integer user3 = applicationManager.getDefaultUserApps().get("user3");
+ assertEquals(user1, 10);
+ assertEquals(user2, 20);
+ assertEquals(user3, 30);
+ }
}
@Test
@@ -82,10 +83,11 @@ public class QuotaManagerTest {
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
Lists.newArrayList(
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
- ApplicationManager applicationManager = new ApplicationManager(conf);
- // it didn't detectUserResource because
`org.apache.unifle.coordinator.AccessQuotaChecker` is
- // not configured
- assertNull(applicationManager.getQuotaManager());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ // it didn't detectUserResource because
`org.apache.unifle.coordinator.AccessQuotaChecker` is
+ // not configured
+ assertNull(applicationManager.getQuotaManager());
+ }
}
@Test
@@ -93,34 +95,35 @@ public class QuotaManagerTest {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 5);
- final ApplicationManager applicationManager = new ApplicationManager(conf);
- final AtomicInteger uuid = new AtomicInteger();
- Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
- uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
- final int i1 = uuid.incrementAndGet();
- uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
- Map<String, Long> appAndTime =
- applicationManager
- .getQuotaManager()
- .getCurrentUserAndApp()
- .computeIfAbsent("user4", x -> uuidAndTime);
- // This thread may remove the uuid and put the appId in.
- final Thread registerThread =
- new Thread(
- () ->
- applicationManager
- .getQuotaManager()
- .registerApplicationInfo("application_test_" + i1,
appAndTime));
- registerThread.start();
- final boolean icCheck =
- applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i1));
- registerThread.join();
- assertTrue(icCheck);
- assertEquals(
-
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
5);
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ final AtomicInteger uuid = new AtomicInteger();
+ Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(uuid.incrementAndGet()),
System.currentTimeMillis());
+ final int i1 = uuid.incrementAndGet();
+ uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+ Map<String, Long> appAndTime =
+ applicationManager
+ .getQuotaManager()
+ .getCurrentUserAndApp()
+ .computeIfAbsent("user4", x -> uuidAndTime);
+ // This thread may remove the uuid and put the appId in.
+ final Thread registerThread =
+ new Thread(
+ () ->
+ applicationManager
+ .getQuotaManager()
+ .registerApplicationInfo("application_test_" + i1,
appAndTime));
+ registerThread.start();
+ final boolean icCheck =
+ applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i1));
+ registerThread.join();
+ assertTrue(icCheck);
+ assertEquals(
+
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
5);
+ }
}
@Test
@@ -131,75 +134,77 @@ public class QuotaManagerTest {
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
conf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 1500);
conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 2);
- final ApplicationManager applicationManager = new ApplicationManager(conf);
- final AtomicInteger uuid = new AtomicInteger();
- final int i1 = uuid.incrementAndGet();
- final int i2 = uuid.incrementAndGet();
- final int i3 = uuid.incrementAndGet();
- final int i4 = uuid.incrementAndGet();
- Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
- uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
- uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
- final boolean icCheck =
- applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i1));
- final boolean icCheck2 =
- applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i2));
- final boolean icCheck3 =
- applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i3));
- final boolean icCheck4 =
- applicationManager.getQuotaManager().checkQuota("user3",
String.valueOf(i4));
- assertFalse(icCheck);
- assertFalse(icCheck2);
- // The default number of tasks submitted is 2, and the third will be
rejected
- assertTrue(icCheck3);
- assertFalse(icCheck4);
- assertEquals(
-
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
2);
-
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(),
2);
-
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(),
1);
- await()
- .atMost(2, TimeUnit.SECONDS)
- .until(
- () -> {
- applicationManager.statusCheck();
- // If the number of apps corresponding to this user is 0, remove
this user
- return
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
- &&
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
- });
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ final AtomicInteger uuid = new AtomicInteger();
+ final int i1 = uuid.incrementAndGet();
+ final int i2 = uuid.incrementAndGet();
+ final int i3 = uuid.incrementAndGet();
+ final int i4 = uuid.incrementAndGet();
+ Map<String, Long> uuidAndTime = JavaUtils.newConcurrentMap();
+ uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
+ final boolean icCheck =
+ applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i1));
+ final boolean icCheck2 =
+ applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i2));
+ final boolean icCheck3 =
+ applicationManager.getQuotaManager().checkQuota("user4",
String.valueOf(i3));
+ final boolean icCheck4 =
+ applicationManager.getQuotaManager().checkQuota("user3",
String.valueOf(i4));
+ assertFalse(icCheck);
+ assertFalse(icCheck2);
+ // The default number of tasks submitted is 2, and the third will be
rejected
+ assertTrue(icCheck3);
+ assertFalse(icCheck4);
+ assertEquals(
+
applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(),
2);
+
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(),
2);
+
assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(),
1);
+ await()
+ .atMost(2, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ applicationManager.statusCheck();
+ // If the number of apps corresponding to this user is 0,
remove this user
+ return
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
+ &&
CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
+ });
+ }
}
@Test
public void testCheckQuotaWithDefault() {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
- final ApplicationManager applicationManager = new ApplicationManager(conf);
- Awaitility.await()
- .timeout(5, TimeUnit.SECONDS)
- .until(() -> applicationManager.getDefaultUserApps().size() > 2);
-
- QuotaManager quotaManager = applicationManager.getQuotaManager();
- Map<String, Map<String, Long>> currentUserAndApp =
quotaManager.getCurrentUserAndApp();
-
- currentUserAndApp.computeIfAbsent("user1", x -> mockUUidAppAndTime(30));
- currentUserAndApp.computeIfAbsent("user2", x -> mockUUidAppAndTime(20));
- currentUserAndApp.computeIfAbsent("user3", x -> mockUUidAppAndTime(29));
- currentUserAndApp.computeIfAbsent("disable_quota_user1", x ->
mockUUidAppAndTime(100));
- currentUserAndApp.computeIfAbsent("blank_user1", x ->
mockUUidAppAndTime(0));
-
- assertEquals(currentUserAndApp.get("user1").size(), 30);
- assertEquals(currentUserAndApp.get("user2").size(), 20);
- assertEquals(currentUserAndApp.get("user3").size(), 29);
- assertEquals(currentUserAndApp.get("disable_quota_user1").size(), 100);
- assertEquals(currentUserAndApp.get("blank_user1").size(), 0);
-
- assertTrue(quotaManager.checkQuota("user1", mockUUidAppId()));
- assertTrue(quotaManager.checkQuota("user2", mockUUidAppId()));
- assertFalse(quotaManager.checkQuota("user3", mockUUidAppId()));
- assertTrue(quotaManager.checkQuota("user3", mockUUidAppId()));
- assertFalse(quotaManager.checkQuota("disable_quota_user1",
mockUUidAppId()));
- assertTrue(quotaManager.checkQuota("blank_user1", mockUUidAppId()));
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ Awaitility.await()
+ .timeout(5, TimeUnit.SECONDS)
+ .until(() -> applicationManager.getDefaultUserApps().size() > 2);
+
+ QuotaManager quotaManager = applicationManager.getQuotaManager();
+ Map<String, Map<String, Long>> currentUserAndApp =
quotaManager.getCurrentUserAndApp();
+
+ currentUserAndApp.computeIfAbsent("user1", x -> mockUUidAppAndTime(30));
+ currentUserAndApp.computeIfAbsent("user2", x -> mockUUidAppAndTime(20));
+ currentUserAndApp.computeIfAbsent("user3", x -> mockUUidAppAndTime(29));
+ currentUserAndApp.computeIfAbsent("disable_quota_user1", x ->
mockUUidAppAndTime(100));
+ currentUserAndApp.computeIfAbsent("blank_user1", x ->
mockUUidAppAndTime(0));
+
+ assertEquals(currentUserAndApp.get("user1").size(), 30);
+ assertEquals(currentUserAndApp.get("user2").size(), 20);
+ assertEquals(currentUserAndApp.get("user3").size(), 29);
+ assertEquals(currentUserAndApp.get("disable_quota_user1").size(), 100);
+ assertEquals(currentUserAndApp.get("blank_user1").size(), 0);
+
+ assertTrue(quotaManager.checkQuota("user1", mockUUidAppId()));
+ assertTrue(quotaManager.checkQuota("user2", mockUUidAppId()));
+ assertFalse(quotaManager.checkQuota("user3", mockUUidAppId()));
+ assertTrue(quotaManager.checkQuota("user3", mockUUidAppId()));
+ assertFalse(quotaManager.checkQuota("disable_quota_user1",
mockUUidAppId()));
+ assertTrue(quotaManager.checkQuota("blank_user1", mockUUidAppId()));
+ }
}
private String mockUUidAppId() {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
index bec781320..350982c6e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
@@ -53,53 +53,54 @@ public class AccessManagerTest {
// test init
CoordinatorConf conf = new CoordinatorConf();
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
- ApplicationManager applicationManager = new ApplicationManager(conf);
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- String expectedMessage = "Empty classes";
- assertTrue(e.getMessage().startsWith(expectedMessage));
- }
- conf.setString(
- CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-
"com.Dummy,org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessChecker");
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
- assertTrue(e.getMessage().startsWith(expectedMessage));
- }
- // test empty checkers
- conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
- AccessManager accessManager =
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ try {
new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
- assertTrue(
- accessManager
- .handleAccessRequest(
- new AccessInfo(
- String.valueOf(new Random().nextInt()),
- Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
- Collections.emptyMap(),
- "user"))
- .isSuccess());
- accessManager.close();
- // test mock checkers
- String alwaysTrueClassName = MockAccessCheckerAlwaysTrue.class.getName();
- conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
alwaysTrueClassName + ",");
- accessManager =
+ } catch (RuntimeException e) {
+ String expectedMessage = "Empty classes";
+ assertTrue(e.getMessage().startsWith(expectedMessage));
+ }
+ conf.setString(
+ CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
+
"com.Dummy,org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessChecker");
+ try {
new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
- assertEquals(1, accessManager.getAccessCheckers().size());
- assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
- assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock2")).isSuccess());
- String alwaysFalseClassName = MockAccessCheckerAlwaysFalse.class.getName();
- conf.setString(
- CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- alwaysTrueClassName + "," + alwaysFalseClassName);
- accessManager =
- new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
- assertEquals(2, accessManager.getAccessCheckers().size());
- assertFalse(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
- accessManager.close();
+ } catch (RuntimeException e) {
+ String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
+ assertTrue(e.getMessage().startsWith(expectedMessage));
+ }
+ // test empty checkers
+ conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
+ AccessManager accessManager =
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ assertTrue(
+ accessManager
+ .handleAccessRequest(
+ new AccessInfo(
+ String.valueOf(new Random().nextInt()),
+ Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION),
+ Collections.emptyMap(),
+ "user"))
+ .isSuccess());
+ accessManager.close();
+ // test mock checkers
+ String alwaysTrueClassName = MockAccessCheckerAlwaysTrue.class.getName();
+ conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
alwaysTrueClassName + ",");
+ accessManager =
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ assertEquals(1, accessManager.getAccessCheckers().size());
+ assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
+ assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock2")).isSuccess());
+ String alwaysFalseClassName =
MockAccessCheckerAlwaysFalse.class.getName();
+ conf.setString(
+ CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
+ alwaysTrueClassName + "," + alwaysFalseClassName);
+ accessManager =
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ assertEquals(2, accessManager.getAccessCheckers().size());
+ assertFalse(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
+ accessManager.close();
+ }
}
public static class MockAccessCheckerAlwaysTrue extends
AbstractAccessChecker {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
index 86bde900e..2b8ba8d55 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
@@ -66,86 +66,87 @@ public class AccessCandidatesCheckerTest {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
tempDir.toURI().toString());
String checkerClassName = AccessCandidatesChecker.class.getName();
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
checkerClassName);
- final ApplicationManager applicationManager = new ApplicationManager(conf);
- // file load checking at startup
- Exception expectedException = null;
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- expectedException = e;
- }
- assertNotNull(expectedException);
- assertTrue(
- expectedException
- .getMessage()
- .contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
- conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
cfgFile.toURI().toString());
- expectedException = null;
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- expectedException = e;
- }
- assertNotNull(expectedException);
- assertTrue(
- expectedException
- .getMessage()
- .contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
-
- // load the config at the beginning
- FileWriter fileWriter = new FileWriter(cfgFile);
- PrintWriter printWriter = new PrintWriter(fileWriter);
- printWriter.println("9527");
- printWriter.println(" 135 ");
- printWriter.println("2 ");
- printWriter.flush();
- printWriter.close();
- AccessManager accessManager =
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ // file load checking at startup
+ Exception expectedException = null;
+ try {
new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
- AccessCandidatesChecker checker =
- (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
- sleep(1200);
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ } catch (RuntimeException e) {
+ expectedException = e;
+ }
+ assertNotNull(expectedException);
+ assertTrue(
+ expectedException
+ .getMessage()
+ .contains(
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+ conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
cfgFile.toURI().toString());
+ expectedException = null;
+ try {
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ } catch (RuntimeException e) {
+ expectedException = e;
+ }
+ assertNotNull(expectedException);
+ assertTrue(
+ expectedException
+ .getMessage()
+ .contains(
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+
+ // load the config at the beginning
+ FileWriter fileWriter = new FileWriter(cfgFile);
+ PrintWriter printWriter = new PrintWriter(fileWriter);
+ printWriter.println("9527");
+ printWriter.println(" 135 ");
+ printWriter.println("2 ");
+ printWriter.flush();
+ printWriter.close();
+ AccessManager accessManager =
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ AccessCandidatesChecker checker =
+ (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
+ sleep(1200);
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // ignore empty or wrong content
- printWriter.println("");
- printWriter.flush();
- printWriter.close();
- sleep(1300);
- assertTrue(cfgFile.exists());
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ // ignore empty or wrong content
+ printWriter.println("");
+ printWriter.flush();
+ printWriter.close();
+ sleep(1300);
+ assertTrue(cfgFile.exists());
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // the config will not be changed when the conf file is deleted
- assertTrue(cfgFile.delete());
- sleep(1200);
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ // the config will not be changed when the conf file is deleted
+ assertTrue(cfgFile.delete());
+ sleep(1200);
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // the normal update config process, move the new conf file to the old one
- File cfgFileTmp = new File(cfgFileName + ".tmp");
- fileWriter = new FileWriter(cfgFileTmp);
- printWriter = new PrintWriter(fileWriter);
- printWriter.println("13");
- printWriter.println("57");
- printWriter.close();
- FileUtils.moveFile(cfgFileTmp, cfgFile);
- sleep(1200);
- assertEquals(Sets.newHashSet("13", "57"), checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("13")).isSuccess());
- assertTrue(checker.check(new AccessInfo("57")).isSuccess());
- checker.close();
+ // the normal update config process, move the new conf file to the old
one
+ File cfgFileTmp = new File(cfgFileName + ".tmp");
+ fileWriter = new FileWriter(cfgFileTmp);
+ printWriter = new PrintWriter(fileWriter);
+ printWriter.println("13");
+ printWriter.println("57");
+ printWriter.close();
+ FileUtils.moveFile(cfgFileTmp, cfgFile);
+ sleep(1200);
+ assertEquals(Sets.newHashSet("13", "57"), checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("13")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("57")).isSuccess());
+ checker.close();
+ }
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
index a2e697e73..de6a2d420 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
@@ -80,46 +80,47 @@ public class AccessClusterLoadCheckerTest {
conf.set(COORDINATOR_ACCESS_CHECKERS,
Collections.singletonList(clusterLoaderCheckerName));
conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
- ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager =
- new AccessManager(
- conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
-
- AccessClusterLoadChecker accessClusterLoadChecker =
- (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
-
- /**
- * case1: when setting the invalid required shuffle nodes number of job
and available servers
- * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
- */
- Map<String, String> properties = new HashMap<>();
- properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
- AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
- assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
- /**
- * case2: when setting the valid required shuffle nodes number of job and
available servers
- * greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
- */
- properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
- accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
- assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
- /**
- * case3: when setting the valid required shuffle nodes number of job and
available servers less
- * than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
- */
- properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
- accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
- assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
-
- /**
- * case4: when the required shuffle nodes number is not specified in
access info, it should use
- * the default shuffle nodes max from coordinator conf.
- */
- properties = new HashMap<>();
- accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
- assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ AccessManager accessManager =
+ new AccessManager(
+ conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
+
+ AccessClusterLoadChecker accessClusterLoadChecker =
+ (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
+
+ /**
+ * case1: when setting the invalid required shuffle nodes number of job
and available servers
+ * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+ */
+ Map<String, String> properties = new HashMap<>();
+ properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
+ assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+ /**
+ * case2: when setting the valid required shuffle nodes number of job
and available servers
+ * greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
+ */
+ properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+ assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+ /**
+ * case3: when setting the valid required shuffle nodes number of job
and available servers
+ * less than the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+ */
+ properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+ assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+ /**
+ * case4: when the required shuffle nodes number is not specified in
access info, it should
+ * use the default shuffle nodes max from coordinator conf.
+ */
+ properties = new HashMap<>();
+ accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
+ assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+ }
}
@Test
@@ -133,23 +134,24 @@ public class AccessClusterLoadCheckerTest {
.getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
clusterLoaderCheckerName);
- ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager =
- new AccessManager(
- conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
- AccessClusterLoadChecker accessClusterLoadChecker =
- (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
- when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
- assertFalse(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
- assertEquals(2, accessClusterLoadChecker.getAvailableServerNumThreshold());
- assertEquals(0,
Double.compare(accessClusterLoadChecker.getMemoryPercentThreshold(), 20.0));
- ServerNode node2 = new ServerNode("1", "1", 0, 90, 40, 10, 0, null);
- serverNodeList.add(node2);
- ServerNode node3 = new ServerNode("1", "1", 0, 80, 25, 20, 0, null);
- serverNodeList.add(node3);
- assertFalse(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
- ServerNode node4 = new ServerNode("1", "1", 0, 75, 25, 25, 0, null);
- serverNodeList.add(node4);
- assertTrue(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ AccessManager accessManager =
+ new AccessManager(
+ conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
+ AccessClusterLoadChecker accessClusterLoadChecker =
+ (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
+ when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
+ assertFalse(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
+ assertEquals(2,
accessClusterLoadChecker.getAvailableServerNumThreshold());
+ assertEquals(0,
Double.compare(accessClusterLoadChecker.getMemoryPercentThreshold(), 20.0));
+ ServerNode node2 = new ServerNode("1", "1", 0, 90, 40, 10, 0, null);
+ serverNodeList.add(node2);
+ ServerNode node3 = new ServerNode("1", "1", 0, 80, 25, 20, 0, null);
+ serverNodeList.add(node3);
+ assertFalse(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
+ ServerNode node4 = new ServerNode("1", "1", 0, 75, 25, 25, 0, null);
+ serverNodeList.add(node4);
+ assertTrue(accessClusterLoadChecker.check(new
AccessInfo("test")).isSuccess());
+ }
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
index 8d74c773e..5a2a23d11 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
@@ -76,37 +76,39 @@ public class AccessQuotaCheckerTest {
conf.set(
COORDINATOR_ACCESS_CHECKERS,
Collections.singletonList(AccessQuotaChecker.class.getName()));
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
- ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager =
- new AccessManager(
- conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
-
- AccessQuotaChecker accessQuotaChecker =
- (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+ Map<String, String> properties = new HashMap<>();
/**
* case1: when user set default app num is 5, and commit 6 app which
current app num is greater
* than default app num, it will reject 1 app and return false.
*/
- Map<String, String> properties = new HashMap<>();
- AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
- assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
- assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
- assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
- assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ AccessManager accessManager =
+ new AccessManager(
+ conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
+ AccessQuotaChecker accessQuotaChecker =
+ (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+ }
/**
* case2: when setting the valid required shuffle nodes number of job and
available servers
* greater than the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
*/
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 0);
- applicationManager = new ApplicationManager(conf);
- accessManager =
- new AccessManager(
- conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
- accessQuotaChecker = (AccessQuotaChecker)
accessManager.getAccessCheckers().get(0);
- accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
- assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ AccessManager accessManager =
+ new AccessManager(
+ conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
+ AccessQuotaChecker accessQuotaChecker =
+ (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
+ assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
+ }
/**
* case3: when setting two checkers and the valid required shuffle nodes
number of job and
@@ -118,16 +120,18 @@ public class AccessQuotaCheckerTest {
Arrays.asList(
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker",
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
- applicationManager = new ApplicationManager(conf);
- accessManager =
- new AccessManager(
- conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
- accessQuotaChecker = (AccessQuotaChecker)
accessManager.getAccessCheckers().get(0);
- final AccessClusterLoadChecker accessClusterLoadChecker =
- (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
- properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
- accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
- assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
- assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ AccessManager accessManager =
+ new AccessManager(
+ conf, clusterManager, applicationManager.getQuotaManager(), new
Configuration());
+ AccessQuotaChecker accessQuotaChecker =
+ (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
+ final AccessClusterLoadChecker accessClusterLoadChecker =
+ (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
+ properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+ AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(),
properties, "user");
+ assertTrue(accessQuotaChecker.check(accessInfo).isSuccess());
+ assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+ }
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
index 8a117b86d..790cec23b 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/conf/DynamicClientConfServiceTest.java
@@ -63,10 +63,15 @@ public class DynamicClientConfServiceTest {
// file load checking at startup
Exception expectedException = null;
+ DynamicClientConfService dynamicClientConfService = null;
try {
- new DynamicClientConfService(conf, new Configuration());
+ dynamicClientConfService = new DynamicClientConfService(conf, new
Configuration());
} catch (RuntimeException e) {
expectedException = e;
+ } finally {
+ if (dynamicClientConfService != null) {
+ dynamicClientConfService.close();
+ }
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().endsWith("is not a file."));
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
index 95db4cb43..fd03f19d5 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.coordinator.strategy.assignment;
-import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -29,8 +28,6 @@ import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.PartitionRange;
@@ -46,185 +43,202 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class BasicAssignmentStrategyTest {
Set<String> tags = Sets.newHashSet("test");
- private SimpleClusterManager clusterManager;
- private BasicAssignmentStrategy strategy;
private int shuffleNodesMax = 7;
- @BeforeEach
- public void setUp() throws Exception {
+ @Test
+ public void testAssign() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
ssc.set(
CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
- clusterManager = new SimpleClusterManager(ssc, new Configuration());
- strategy = new BasicAssignmentStrategy(clusterManager, ssc);
- }
-
- @AfterEach
- public void tearDown() throws IOException {
- clusterManager.clear();
- clusterManager.close();
- }
-
- @Test
- public void testAssign() {
- for (int i = 0; i < 20; ++i) {
- clusterManager.add(
- new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i,
0, tags));
- }
-
- PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
- SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
- assertEquals(10, assignments.size());
-
- for (int i = 0; i < 100; i += 10) {
- assertTrue(assignments.containsKey(new PartitionRange(i, i + 10)));
- }
-
- int i = 0;
- Iterator<List<ServerNode>> ite = assignments.values().iterator();
- while (ite.hasNext()) {
- List<ServerNode> cur = ite.next();
- assertEquals(2, cur.size());
- assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(0).getId());
- i++;
- assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(1).getId());
- i++;
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ BasicAssignmentStrategy strategy = new
BasicAssignmentStrategy(clusterManager, ssc);
+
+ for (int i = 0; i < 20; ++i) {
+ clusterManager.add(
+ new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, 20 - i,
0, tags));
+ }
+
+ PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
+ assertEquals(10, assignments.size());
+
+ for (int i = 0; i < 100; i += 10) {
+ assertTrue(assignments.containsKey(new PartitionRange(i, i + 10)));
+ }
+
+ int i = 0;
+ Iterator<List<ServerNode>> ite = assignments.values().iterator();
+ while (ite.hasNext()) {
+ List<ServerNode> cur = ite.next();
+ assertEquals(2, cur.size());
+ assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(0).getId());
+ i++;
+ assertEquals(String.valueOf(i % shuffleNodesMax), cur.get(1).getId());
+ i++;
+ }
}
}
@Test
- public void testRandomAssign() {
- for (int i = 0; i < 20; ++i) {
- clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0,
0, 0, 0, 0, tags));
- }
- PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
- SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
- Set<ServerNode> serverNodes1 = Sets.newHashSet();
- for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
- serverNodes1.addAll(assignment.getValue());
- }
-
- pra = strategy.assign(100, 10, 2, tags, -1, -1);
- assignments = pra.getAssignments();
- Set<ServerNode> serverNodes2 = Sets.newHashSet();
- for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
- serverNodes2.addAll(assignment.getValue());
+ public void testRandomAssign() throws Exception {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.set(
+ CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+ AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+ ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ BasicAssignmentStrategy strategy = new
BasicAssignmentStrategy(clusterManager, ssc);
+ for (int i = 0; i < 20; ++i) {
+ clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i,
0, 0, 0, 0, 0, tags));
+ }
+ PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
+ Set<ServerNode> serverNodes1 = Sets.newHashSet();
+ for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
+ serverNodes1.addAll(assignment.getValue());
+ }
+
+ pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ assignments = pra.getAssignments();
+ Set<ServerNode> serverNodes2 = Sets.newHashSet();
+ for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
+ serverNodes2.addAll(assignment.getValue());
+ }
+
+ // test for the random node pick, there is a little possibility failed
+ assertFalse(serverNodes1.containsAll(serverNodes2));
}
-
- // test for the random node pick, there is a little possibility failed
- assertFalse(serverNodes1.containsAll(serverNodes2));
}
@Test
- public void testAssignWithDifferentNodeNum() {
- final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0, 20, 0, tags);
- final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0, 10, 0, tags);
- final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0, 0, 0, tags);
-
- clusterManager.add(sn1);
- PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
- // nodeNum < replica
- assertNull(pra.getAssignments());
-
- // nodeNum = replica
- clusterManager.add(sn2);
- pra = strategy.assign(100, 10, 2, tags, -1, -1);
- SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
- Set<ServerNode> serverNodes = Sets.newHashSet();
- for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
- serverNodes.addAll(assignment.getValue());
- }
- assertEquals(2, serverNodes.size());
- assertTrue(serverNodes.contains(sn1));
- assertTrue(serverNodes.contains(sn2));
-
- // nodeNum > replica & nodeNum < shuffleNodesMax
- clusterManager.add(sn3);
- pra = strategy.assign(100, 10, 2, tags, -1, -1);
- assignments = pra.getAssignments();
- serverNodes = Sets.newHashSet();
- for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
- serverNodes.addAll(assignment.getValue());
+ public void testAssignWithDifferentNodeNum() throws Exception {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.set(
+ CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+ AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+ ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ BasicAssignmentStrategy strategy = new
BasicAssignmentStrategy(clusterManager, ssc);
+
+ final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0, 20, 0, tags);
+ final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0, 10, 0, tags);
+ final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0, 0, 0, tags);
+
+ clusterManager.add(sn1);
+ PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ // nodeNum < replica
+ assertNull(pra.getAssignments());
+
+ // nodeNum = replica
+ clusterManager.add(sn2);
+ pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ SortedMap<PartitionRange, List<ServerNode>> assignments =
pra.getAssignments();
+ Set<ServerNode> serverNodes = Sets.newHashSet();
+ for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
+ serverNodes.addAll(assignment.getValue());
+ }
+ assertEquals(2, serverNodes.size());
+ assertTrue(serverNodes.contains(sn1));
+ assertTrue(serverNodes.contains(sn2));
+
+ // nodeNum > replica & nodeNum < shuffleNodesMax
+ clusterManager.add(sn3);
+ pra = strategy.assign(100, 10, 2, tags, -1, -1);
+ assignments = pra.getAssignments();
+ serverNodes = Sets.newHashSet();
+ for (Map.Entry<PartitionRange, List<ServerNode>> assignment :
assignments.entrySet()) {
+ serverNodes.addAll(assignment.getValue());
+ }
+ assertEquals(3, serverNodes.size());
+ assertTrue(serverNodes.contains(sn1));
+ assertTrue(serverNodes.contains(sn2));
+ assertTrue(serverNodes.contains(sn3));
}
- assertEquals(3, serverNodes.size());
- assertTrue(serverNodes.contains(sn1));
- assertTrue(serverNodes.contains(sn2));
- assertTrue(serverNodes.contains(sn3));
}
@Test
- public void testAssignmentShuffleNodesNum() {
- Set<String> serverTags = Sets.newHashSet("tag-1");
-
- for (int i = 0; i < 20; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
-
- /**
- * case1: user specify the illegal shuffle node num(<0) it will use the
default shuffle nodes
- * num when having enough servers.
- */
- PartitionRangeAssignment pra = strategy.assign(100, 10, 1, serverTags, -1,
-1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case2: user specify the illegal shuffle node num(==0) it will use the
default shuffle nodes
- * num when having enough servers.
- */
- pra = strategy.assign(100, 10, 1, serverTags, 0, -1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case3: user specify the illegal shuffle node num(>default max
limitation) it will use the
- * default shuffle nodes num when having enough servers
- */
- pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax + 10, -1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case4: user specify the legal shuffle node num, it will use the
customized shuffle nodes num
- * when having enough servers
- */
- pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax - 1, -1);
- assertEquals(
- shuffleNodesMax - 1,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case5: user specify the legal shuffle node num, but cluster don't have
enough servers, it
- * will return the remaining servers.
- */
- serverTags = Sets.newHashSet("tag-2");
- for (int i = 0; i < shuffleNodesMax - 1; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, 20 - i, 0,
serverTags));
+ public void testAssignmentShuffleNodesNum() throws Exception {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.set(
+ CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+ AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+ ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ BasicAssignmentStrategy strategy = new
BasicAssignmentStrategy(clusterManager, ssc);
+
+ Set<String> serverTags = Sets.newHashSet("tag-1");
+
+ for (int i = 0; i < 20; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+
+ /**
+ * case1: user specify the illegal shuffle node num(<0) it will use the
default shuffle nodes
+ * num when having enough servers.
+ */
+ PartitionRangeAssignment pra = strategy.assign(100, 10, 1, serverTags,
-1, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case2: user specify the illegal shuffle node num(==0) it will use the
default shuffle nodes
+ * num when having enough servers.
+ */
+ pra = strategy.assign(100, 10, 1, serverTags, 0, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case3: user specify the illegal shuffle node num(>default max
limitation) it will use the
+ * default shuffle nodes num when having enough servers
+ */
+ pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax + 10, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case4: user specify the legal shuffle node num, it will use the
customized shuffle nodes
+ * num when having enough servers
+ */
+ pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax - 1, -1);
+ assertEquals(
+ shuffleNodesMax - 1,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case5: user specify the legal shuffle node num, but cluster don't
have enough servers, it
+ * will return the remaining servers.
+ */
+ serverTags = Sets.newHashSet("tag-2");
+ for (int i = 0; i < shuffleNodesMax - 1; ++i) {
+ clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
+ assertEquals(
+ shuffleNodesMax - 1,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
}
- pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1);
- assertEquals(
- shuffleNodesMax - 1,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
}
@Test
@@ -234,31 +248,33 @@ public class BasicAssignmentStrategyTest {
CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS);
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
- clusterManager = new SimpleClusterManager(ssc, new Configuration());
- strategy = new BasicAssignmentStrategy(clusterManager, ssc);
- List<Long> list =
- Lists.newArrayList(
- 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L,
20L, 20L, 20L, 20L,
- 20L, 20L, 20L);
- updateServerResource(list);
- PartitionRangeAssignment assignment = strategy.assign(100, 1, 2, tags, 5,
20);
- List<Long> expect = Lists.newArrayList(40L, 40L, 40L, 40L, 40L);
- valid(expect, assignment.getAssignments());
-
- assignment = strategy.assign(28, 1, 2, tags, 5, 20);
- expect = Lists.newArrayList(11L, 12L, 12L, 11L, 10L);
- valid(expect, assignment.getAssignments());
-
- assignment = strategy.assign(29, 1, 2, tags, 5, 4);
- expect = Lists.newArrayList(11L, 12L, 12L, 12L, 11L);
- valid(expect, assignment.getAssignments());
-
- assignment = strategy.assign(29, 2, 2, tags, 5, 4);
- expect = Lists.newArrayList(12L, 12L, 12L, 12L, 12L);
- valid(expect, assignment.getAssignments());
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ BasicAssignmentStrategy strategy = new
BasicAssignmentStrategy(clusterManager, ssc);
+
+ List<Long> list =
+ Lists.newArrayList(
+ 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L,
20L, 20L, 20L, 20L,
+ 20L, 20L, 20L);
+ updateServerResource(clusterManager, list);
+ PartitionRangeAssignment assignment = strategy.assign(100, 1, 2, tags,
5, 20);
+ List<Long> expect = Lists.newArrayList(40L, 40L, 40L, 40L, 40L);
+ valid(expect, assignment.getAssignments());
+
+ assignment = strategy.assign(28, 1, 2, tags, 5, 20);
+ expect = Lists.newArrayList(11L, 12L, 12L, 11L, 10L);
+ valid(expect, assignment.getAssignments());
+
+ assignment = strategy.assign(29, 1, 2, tags, 5, 4);
+ expect = Lists.newArrayList(11L, 12L, 12L, 12L, 11L);
+ valid(expect, assignment.getAssignments());
+
+ assignment = strategy.assign(29, 2, 2, tags, 5, 4);
+ expect = Lists.newArrayList(12L, 12L, 12L, 12L, 12L);
+ valid(expect, assignment.getAssignments());
+ }
}
- void updateServerResource(List<Long> resources) {
+ void updateServerResource(SimpleClusterManager clusterManager, List<Long>
resources) {
for (int i = 0; i < resources.size(); i++) {
ServerNode node =
new ServerNode(
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index 2d80c65a4..60d6cb873 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -17,7 +17,6 @@
package org.apache.uniffle.coordinator.strategy.assignment;
-import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -31,8 +30,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -46,149 +43,150 @@ import static org.junit.jupiter.api.Assertions.fail;
public class PartitionBalanceAssignmentStrategyTest {
- private SimpleClusterManager clusterManager;
- private PartitionBalanceAssignmentStrategy strategy;
private int shuffleNodesMax = 5;
private Set<String> tags = Sets.newHashSet("test");
- @BeforeEach
- public void setUp() throws Exception {
+ @Test
+ public void testAssign() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
ssc.set(
CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
- clusterManager = new SimpleClusterManager(ssc, new Configuration());
- strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- }
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ PartitionBalanceAssignmentStrategy strategy =
+ new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- @Test
- public void testAssign() {
- List<Long> list = Lists.newArrayList();
- for (int i = 0; i < 20; i++) {
- list.add(10L);
- }
- updateServerResource(list);
- boolean isThrown = false;
- try {
- strategy.assign(100, 2, 1, tags, -1, -1);
- } catch (Exception e) {
- isThrown = true;
- }
- assertTrue(isThrown);
- try {
- strategy.assign(0, 1, 1, tags, -1, -1);
- } catch (Exception e) {
- fail();
- }
- isThrown = false;
- try {
- strategy.assign(10, 1, 1, Sets.newHashSet("fake"), 1, -1);
- } catch (Exception e) {
- isThrown = true;
- }
- assertTrue(isThrown);
- strategy.assign(100, 1, 1, tags, -1, -1);
- List<Long> expect =
- Lists.newArrayList(
- 20L, 20L, 20L, 20L, 20L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L);
- valid(expect);
- strategy.assign(75, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L,
- 0L);
- valid(expect);
- strategy.assign(100, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 20L, 20L, 20L,
20L, 20L, 0L, 0L, 0L,
- 0L, 0L);
- valid(expect);
-
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
- list =
- Lists.newArrayList(
- 7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 17L,
8L, 1L, 3L, 3L, 6L,
- 12L);
- updateServerResource(list);
- strategy.assign(100, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L,
0L, 0L, 0L, 0L, 0L);
- valid(expect);
- strategy.assign(50, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 20L, 0L, 0L, 20L, 10L, 10L, 0L, 20L, 0L, 10L, 20L, 10L, 20L,
0L, 0L, 0L, 0L, 0L,
- 10L);
- valid(expect);
-
- strategy.assign(75, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 20L, 0L, 0L, 20L, 25L, 10L, 15L, 20L, 15L, 25L, 20L, 25L, 20L,
0L, 0L, 0L, 0L, 0L,
- 10L);
- valid(expect);
-
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
- list =
- Lists.newArrayList(
- 7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L, 17L,
8L, 1L, 3L, 3L, 6L,
- 12L);
- updateServerResource(list);
- strategy.assign(50, 1, 2, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L,
0L, 0L, 0L, 0L, 0L);
- valid(expect);
- strategy.assign(75, 1, 2, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 20L, 0L, 0L, 50L, 30L, 0L, 0L, 20L, 0L, 30L, 20L, 30L, 20L,
0L, 0L, 0L, 0L, 0L,
- 30L);
- valid(expect);
- strategy.assign(33, 1, 2, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 33L, 0L, 0L, 50L, 30L, 14L, 13L, 20L, 13L, 30L, 20L, 30L, 20L,
13L, 0L, 0L, 0L, 0L,
- 30L);
- valid(expect);
-
- list = Lists.newArrayList();
- for (int i = 0; i < 20; i++) {
- if (i % 2 == 0) {
+ List<Long> list = Lists.newArrayList();
+ for (int i = 0; i < 20; i++) {
list.add(10L);
- } else {
- list.add(20L);
}
- }
+ updateServerResource(clusterManager, list);
+ boolean isThrown = false;
+ try {
+ strategy.assign(100, 2, 1, tags, -1, -1);
+ } catch (Exception e) {
+ isThrown = true;
+ }
+ assertTrue(isThrown);
+ try {
+ strategy.assign(0, 1, 1, tags, -1, -1);
+ } catch (Exception e) {
+ fail();
+ }
+ isThrown = false;
+ try {
+ strategy.assign(10, 1, 1, Sets.newHashSet("fake"), 1, -1);
+ } catch (Exception e) {
+ isThrown = true;
+ }
+ assertTrue(isThrown);
+ strategy.assign(100, 1, 1, tags, -1, -1);
+ List<Long> expect =
+ Lists.newArrayList(
+ 20L, 20L, 20L, 20L, 20L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(75, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L, 0L,
+ 0L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(100, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 20L, 20L, 20L, 20L, 20L, 15L, 15L, 15L, 15L, 15L, 20L, 20L, 20L,
20L, 20L, 0L, 0L, 0L,
+ 0L, 0L);
+ valid(clusterManager, strategy, expect);
+
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ list =
+ Lists.newArrayList(
+ 7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L,
17L, 8L, 1L, 3L, 3L, 6L,
+ 12L);
+ updateServerResource(clusterManager, list);
+ strategy.assign(100, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L,
0L, 0L, 0L, 0L, 0L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(50, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 20L, 0L, 0L, 20L, 10L, 10L, 0L, 20L, 0L, 10L, 20L, 10L, 20L,
0L, 0L, 0L, 0L, 0L,
+ 10L);
+ valid(clusterManager, strategy, expect);
+
+ strategy.assign(75, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 20L, 0L, 0L, 20L, 25L, 10L, 15L, 20L, 15L, 25L, 20L, 25L,
20L, 0L, 0L, 0L, 0L, 0L,
+ 10L);
+ valid(clusterManager, strategy, expect);
+
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ list =
+ Lists.newArrayList(
+ 7L, 18L, 7L, 3L, 19L, 15L, 11L, 10L, 16L, 11L, 14L, 17L, 15L,
17L, 8L, 1L, 3L, 3L, 6L,
+ 12L);
+ updateServerResource(clusterManager, list);
+ strategy.assign(50, 1, 2, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 20L, 0L, 0L, 20L, 0L, 0L, 0L, 20L, 0L, 0L, 20L, 0L, 20L, 0L,
0L, 0L, 0L, 0L, 0L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(75, 1, 2, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 20L, 0L, 0L, 50L, 30L, 0L, 0L, 20L, 0L, 30L, 20L, 30L, 20L,
0L, 0L, 0L, 0L, 0L,
+ 30L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(33, 1, 2, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 33L, 0L, 0L, 50L, 30L, 14L, 13L, 20L, 13L, 30L, 20L, 30L,
20L, 13L, 0L, 0L, 0L,
+ 0L, 30L);
+ valid(clusterManager, strategy, expect);
+
+ list = Lists.newArrayList();
+ for (int i = 0; i < 20; i++) {
+ if (i % 2 == 0) {
+ list.add(10L);
+ } else {
+ list.add(20L);
+ }
+ }
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
- updateServerResource(list);
- strategy.assign(33, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L);
- valid(expect);
- strategy.assign(41, 1, 2, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 17L, 0L, 17L, 0L, 16L,
0L, 16L, 0L, 16L);
- valid(expect);
- strategy.assign(23, 1, 1, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 5L, 7L, 5L, 7L, 5L, 7L, 4L, 6L, 4L, 6L, 0L, 17L, 0L, 17L, 0L, 16L,
0L, 16L, 0L, 16L);
- valid(expect);
- strategy.assign(11, 1, 3, tags, -1, -1);
- expect =
- Lists.newArrayList(
- 5L, 7L, 5L, 7L, 5L, 7L, 4L, 13L, 4L, 13L, 7L, 17L, 6L, 17L, 6L,
16L, 0L, 16L, 0L, 16L);
- valid(expect);
+ Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+ updateServerResource(clusterManager, list);
+ strategy.assign(33, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(41, 1, 2, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 0L, 7L, 0L, 7L, 0L, 7L, 0L, 6L, 0L, 6L, 0L, 17L, 0L, 17L, 0L,
16L, 0L, 16L, 0L, 16L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(23, 1, 1, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 5L, 7L, 5L, 7L, 5L, 7L, 4L, 6L, 4L, 6L, 0L, 17L, 0L, 17L, 0L,
16L, 0L, 16L, 0L, 16L);
+ valid(clusterManager, strategy, expect);
+ strategy.assign(11, 1, 3, tags, -1, -1);
+ expect =
+ Lists.newArrayList(
+ 5L, 7L, 5L, 7L, 5L, 7L, 4L, 13L, 4L, 13L, 7L, 17L, 6L, 17L, 6L,
16L, 0L, 16L, 0L,
+ 16L);
+ valid(clusterManager, strategy, expect);
+ }
}
- private void valid(List<Long> expect) {
+ private void valid(
+ SimpleClusterManager clusterManager,
+ PartitionBalanceAssignmentStrategy strategy,
+ List<Long> expect) {
assertEquals(20, expect.size());
int i = 0;
List<ServerNode> list = clusterManager.getServerList(tags);
@@ -206,13 +204,7 @@ public class PartitionBalanceAssignmentStrategyTest {
}
}
- @AfterEach
- public void tearDown() throws IOException {
- clusterManager.clear();
- clusterManager.close();
- }
-
- void updateServerResource(List<Long> resources) {
+ void updateServerResource(SimpleClusterManager clusterManager, List<Long>
resources) {
for (int i = 0; i < 20; i++) {
ServerNode node =
new ServerNode(
@@ -229,76 +221,88 @@ public class PartitionBalanceAssignmentStrategyTest {
}
@Test
- public void testAssignmentShuffleNodesNum() {
- Set<String> serverTags = Sets.newHashSet("tag-1");
+ public void testAssignmentShuffleNodesNum() throws Exception {
+ CoordinatorConf ssc = new CoordinatorConf();
+ ssc.set(
+ CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
+ AbstractAssignmentStrategy.SelectPartitionStrategyName.ROUND);
+ ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ PartitionBalanceAssignmentStrategy strategy =
+ new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- for (int i = 0; i < 20; ++i) {
- clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
- }
+ Set<String> serverTags = Sets.newHashSet("tag-1");
+
+ for (int i = 0; i < 20; ++i) {
+ clusterManager.add(
+ new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
- /**
- * case1: user specify the illegal shuffle node num(<0) it will use the
default shuffle nodes
- * num when having enough servers.
- */
- PartitionRangeAssignment pra = strategy.assign(100, 1, 1, serverTags, -1,
-1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case2: user specify the illegal shuffle node num(==0) it will use the
default shuffle nodes
- * num when having enough servers.
- */
- pra = strategy.assign(100, 1, 1, serverTags, 0, -1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case3: user specify the illegal shuffle node num(>default max
limitation) it will use the
- * default shuffle nodes num when having enough servers
- */
- pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax + 10, -1);
- assertEquals(
- shuffleNodesMax,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case4: user specify the legal shuffle node num, it will use the
customized shuffle nodes num
- * when having enough servers
- */
- pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax - 1, -1);
- assertEquals(
- shuffleNodesMax - 1,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
-
- /**
- * case5: user specify the legal shuffle node num, but cluster don't have
enough servers, it
- * will return the remaining servers.
- */
- serverTags = Sets.newHashSet("tag-2");
- for (int i = 0; i < shuffleNodesMax - 1; ++i) {
- clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20
- i, 0, serverTags));
+ /**
+ * case1: user specify the illegal shuffle node num(<0) it will use the
default shuffle nodes
+ * num when having enough servers.
+ */
+ PartitionRangeAssignment pra = strategy.assign(100, 1, 1, serverTags,
-1, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case2: user specify the illegal shuffle node num(==0) it will use the
default shuffle nodes
+ * num when having enough servers.
+ */
+ pra = strategy.assign(100, 1, 1, serverTags, 0, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case3: user specify the illegal shuffle node num(>default max
limitation) it will use the
+ * default shuffle nodes num when having enough servers
+ */
+ pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax + 10, -1);
+ assertEquals(
+ shuffleNodesMax,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case4: user specify the legal shuffle node num, it will use the
customized shuffle nodes
+ * num when having enough servers
+ */
+ pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax - 1, -1);
+ assertEquals(
+ shuffleNodesMax - 1,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
+
+ /**
+ * case5: user specify the legal shuffle node num, but cluster don't
have enough servers, it
+ * will return the remaining servers.
+ */
+ serverTags = Sets.newHashSet("tag-2");
+ for (int i = 0; i < shuffleNodesMax - 1; ++i) {
+ clusterManager.add(
+ new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, 20 - i, 0,
serverTags));
+ }
+ pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
+ assertEquals(
+ shuffleNodesMax - 1,
+ pra.getAssignments().values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet())
+ .size());
}
- pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1);
- assertEquals(
- shuffleNodesMax - 1,
- pra.getAssignments().values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toSet())
- .size());
}
@Test
@@ -446,31 +450,33 @@ public class PartitionBalanceAssignmentStrategyTest {
CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS);
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
shuffleNodesMax);
- clusterManager = new SimpleClusterManager(ssc, new Configuration());
- strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
- List<Long> list =
- Lists.newArrayList(
- 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L,
20L, 20L, 20L, 20L,
- 20L, 20L, 20L);
- updateServerResource(list);
- strategy.assign(100, 1, 2, tags, 5, 20);
- List<Long> expect =
- Lists.newArrayList(
- 40L, 40L, 40L, 40L, 40L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L);
- valid(expect);
-
- strategy.assign(28, 1, 2, tags, 5, 20);
- expect =
- Lists.newArrayList(
- 40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L,
- 0L);
- valid(expect);
-
- strategy.assign(29, 1, 2, tags, 5, 4);
- expect =
- Lists.newArrayList(
- 40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 11L, 12L, 12L,
12L, 11L, 0L, 0L, 0L,
- 0L, 0L);
- valid(expect);
+ try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc,
new Configuration())) {
+ PartitionBalanceAssignmentStrategy strategy =
+ new PartitionBalanceAssignmentStrategy(clusterManager, ssc);
+ List<Long> list =
+ Lists.newArrayList(
+ 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L, 20L,
20L, 20L, 20L, 20L,
+ 20L, 20L, 20L);
+ updateServerResource(clusterManager, list);
+ strategy.assign(100, 1, 2, tags, 5, 20);
+ List<Long> expect =
+ Lists.newArrayList(
+ 40L, 40L, 40L, 40L, 40L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L);
+ valid(clusterManager, strategy, expect);
+
+ strategy.assign(28, 1, 2, tags, 5, 20);
+ expect =
+ Lists.newArrayList(
+ 40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 0L, 0L, 0L,
0L, 0L, 0L, 0L, 0L, 0L,
+ 0L);
+ valid(clusterManager, strategy, expect);
+
+ strategy.assign(29, 1, 2, tags, 5, 4);
+ expect =
+ Lists.newArrayList(
+ 40L, 40L, 40L, 40L, 40L, 11L, 12L, 12L, 11L, 10L, 11L, 12L, 12L,
12L, 11L, 0L, 0L, 0L,
+ 0L, 0L);
+ valid(clusterManager, strategy, expect);
+ }
}
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index ba0a456a0..cd2592919 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,6 +68,11 @@ public class AppBalanceSelectStorageStrategyTest {
applicationManager.closeDetectStorageScheduler();
}
+ @AfterEach
+ public void tearDown() {
+ applicationManager.close();
+ }
+
@Test
public void selectStorageTest() throws Exception {
String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR +
remotePath2;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 770086b2d..642eec22c 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -72,6 +73,11 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
applicationManager.closeDetectStorageScheduler();
}
+ @AfterEach
+ public void tearDown() {
+ applicationManager.close();
+ }
+
@Test
public void selectStorageTest() throws Exception {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
index 53df37d9d..7f2a2b0f4 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/web/UniffleJavaProcess.java
@@ -20,6 +20,9 @@ package org.apache.uniffle.coordinator.web;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
/**
* Simulate Java process execution for testing purposes. This method can truly
simulate the
@@ -63,6 +66,8 @@ public class UniffleJavaProcess {
process.destroy();
process.waitFor();
process.exitValue();
+ // Wait for a while to ensure the port is released
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
}
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
index 05f0a8f21..ab9e931c0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHadoopTest.java
@@ -70,90 +70,91 @@ public class AccessCandidatesCheckerHadoopTest extends
HadoopTestBase {
conf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker");
- ApplicationManager applicationManager = new ApplicationManager(conf);
- // file load checking at startup
- Exception expectedException = null;
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- expectedException = e;
- }
- assertNotNull(expectedException);
- assertTrue(
- expectedException
- .getMessage()
- .contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
- conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidatesFile);
- expectedException = null;
- try {
- new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
- } catch (RuntimeException e) {
- expectedException = e;
- }
- assertNotNull(expectedException);
- assertTrue(
- expectedException
- .getMessage()
- .contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+ try (ApplicationManager applicationManager = new ApplicationManager(conf))
{
+ // file load checking at startup
+ Exception expectedException = null;
+ try {
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ } catch (RuntimeException e) {
+ expectedException = e;
+ }
+ assertNotNull(expectedException);
+ assertTrue(
+ expectedException
+ .getMessage()
+ .contains(
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
+ conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidatesFile);
+ expectedException = null;
+ try {
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
new Configuration());
+ } catch (RuntimeException e) {
+ expectedException = e;
+ }
+ assertNotNull(expectedException);
+ assertTrue(
+ expectedException
+ .getMessage()
+ .contains(
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
- Path path = new Path(candidatesFile);
- FSDataOutputStream out = fs.create(path);
+ Path path = new Path(candidatesFile);
+ FSDataOutputStream out = fs.create(path);
- PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
- printWriter.println("9527");
- printWriter.println(" 135 ");
- printWriter.println("2 ");
- printWriter.flush();
- printWriter.close();
- AccessManager accessManager =
- new AccessManager(conf, null, applicationManager.getQuotaManager(),
hadoopConf);
- AccessCandidatesChecker checker =
- (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
- // load the config at the beginning
- sleep(1200);
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(out));
+ printWriter.println("9527");
+ printWriter.println(" 135 ");
+ printWriter.println("2 ");
+ printWriter.flush();
+ printWriter.close();
+ AccessManager accessManager =
+ new AccessManager(conf, null, applicationManager.getQuotaManager(),
hadoopConf);
+ AccessCandidatesChecker checker =
+ (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
+ // load the config at the beginning
+ sleep(1200);
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // ignore empty or wrong content
- printWriter.println("");
- printWriter.flush();
- printWriter.close();
- sleep(1300);
- assertTrue(fs.exists(path));
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ // ignore empty or wrong content
+ printWriter.println("");
+ printWriter.flush();
+ printWriter.close();
+ sleep(1300);
+ assertTrue(fs.exists(path));
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // the config will not be changed when the conf file is deleted
- fs.delete(path, true);
- assertFalse(fs.exists(path));
- sleep(1200);
- assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- assertTrue(checker.check(new AccessInfo("135")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1")).isSuccess());
- assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
+ // the config will not be changed when the conf file is deleted
+ fs.delete(path, true);
+ assertFalse(fs.exists(path));
+ sleep(1200);
+ assertEquals(Sets.newHashSet("2", "9527", "135"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("135")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1")).isSuccess());
+ assertFalse(checker.check(new AccessInfo("1_2")).isSuccess());
- // the normal update config process, move the new conf file to the old one
- Path tmpPath = new Path(candidatesFile + ".tmp");
- out = fs.create(tmpPath);
- printWriter = new PrintWriter(new OutputStreamWriter(out));
- printWriter.println("9527");
- printWriter.println(" 1357 ");
- printWriter.flush();
- printWriter.close();
- fs.rename(tmpPath, path);
- sleep(1200);
- assertEquals(Sets.newHashSet("1357", "9527"),
checker.getCandidates().get());
- assertTrue(checker.check(new AccessInfo("1357")).isSuccess());
- assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
- checker.close();
+ // the normal update config process, move the new conf file to the old
one
+ Path tmpPath = new Path(candidatesFile + ".tmp");
+ out = fs.create(tmpPath);
+ printWriter = new PrintWriter(new OutputStreamWriter(out));
+ printWriter.println("9527");
+ printWriter.println(" 1357 ");
+ printWriter.flush();
+ printWriter.close();
+ fs.rename(tmpPath, path);
+ sleep(1200);
+ assertEquals(Sets.newHashSet("1357", "9527"),
checker.getCandidates().get());
+ assertTrue(checker.check(new AccessInfo("1357")).isSuccess());
+ assertTrue(checker.check(new AccessInfo("9527")).isSuccess());
+ checker.close();
+ }
}
}