This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8847ece1 [Refactor] Make coordinator class more organized (#386)
8847ece1 is described below
commit 8847ece10a21800e5deb608441fcc6129a6b841d
Author: jokercurry <[email protected]>
AuthorDate: Tue Dec 6 15:38:58 2022 +0800
[Refactor] Make coordinator class more organized (#386)
### What changes were proposed in this pull request?
Split coordinator class into different folders by their correlation.
### Why are the changes needed?
To resolve #384
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs
---
.../apache/uniffle/coordinator/AccessManager.java | 4 +++
.../uniffle/coordinator/ApplicationManager.java | 17 ++++++++++--
.../uniffle/coordinator/CoordinatorConf.java | 8 ++++--
.../coordinator/CoordinatorGrpcService.java | 6 +++-
.../uniffle/coordinator/CoordinatorServer.java | 4 +++
.../org/apache/uniffle/coordinator/ServerNode.java | 2 +-
.../uniffle/coordinator/SimpleClusterManager.java | 3 +-
.../{ => access}/AccessCheckResult.java | 2 +-
.../coordinator/{ => access}/AccessInfo.java | 2 +-
.../checker}/AbstractAccessChecker.java | 4 ++-
.../checker}/AccessCandidatesChecker.java | 9 +++++-
.../{ => access/checker}/AccessChecker.java | 5 +++-
.../checker}/AccessClusterLoadChecker.java | 10 ++++++-
.../{ => access/checker}/AccessQuotaChecker.java | 8 +++++-
.../{ => metric}/CoordinatorGrpcMetrics.java | 2 +-
.../{ => metric}/CoordinatorMetrics.java | 32 +++++++++++-----------
.../assignment}/AbstractAssignmentStrategy.java | 11 +++++++-
.../assignment}/AssignmentStrategy.java | 2 +-
.../assignment}/AssignmentStrategyFactory.java | 5 +++-
.../assignment}/BasicAssignmentStrategy.java | 5 +++-
.../PartitionBalanceAssignmentStrategy.java | 5 +++-
.../assignment}/PartitionRangeAssignment.java | 3 +-
.../host}/BasicHostAssignmentStrategy.java | 4 ++-
.../host}/HostAssignmentStrategy.java | 4 ++-
.../host}/MustDiffHostAssignmentStrategy.java | 4 ++-
.../host}/PreferDiffHostAssignmentStrategy.java | 4 ++-
.../ContinuousSelectPartitionStrategy.java | 4 ++-
.../partition}/RoundSelectPartitionStrategy.java | 4 ++-
.../partition}/SelectPartitionStrategy.java | 3 +-
.../storage}/AbstractSelectStorageStrategy.java | 3 +-
.../storage}/AppBalanceSelectStorageStrategy.java | 6 ++--
.../LowestIOSampleCostSelectStorageStrategy.java | 6 ++--
.../{ => strategy/storage}/RankValue.java | 2 +-
.../storage}/SelectStorageStrategy.java | 2 +-
.../coordinator/{ => util}/CoordinatorUtils.java | 3 +-
.../coordinator/ApplicationManagerTest.java | 1 +
.../uniffle/coordinator/ClientConfManagerTest.java | 2 ++
.../uniffle/coordinator/QuotaManagerTest.java | 2 +-
.../coordinator/SimpleClusterManagerTest.java | 2 ++
.../{ => access}/AccessManagerTest.java | 15 ++++++----
.../{ => checker}/AccessCandidatesCheckerTest.java | 15 +++++++---
.../AccessClusterLoadCheckerTest.java | 17 ++++++++++--
.../{ => checker}/AccessQuotaCheckerTest.java | 19 ++++++++++---
.../{ => metric}/CoordinatorMetricsTest.java | 4 ++-
.../assignment}/BasicAssignmentStrategyTest.java | 5 +++-
.../PartitionBalanceAssignmentStrategyTest.java | 6 +++-
.../assignment}/PartitionRangeAssignmentTest.java | 3 +-
.../assignment}/PartitionRangeTest.java | 2 +-
.../ContinuousSelectPartitionStrategyTest.java | 6 ++--
.../AppBalanceSelectStorageStrategyTest.java | 11 +++++---
...owestIOSampleCostSelectStorageStrategyTest.java | 9 ++++--
.../{ => util}/CoordinatorUtilsTest.java | 2 +-
docs/coordinator_guide.md | 5 +++-
docs/uniffle-migration-guide.md | 25 +++++++++++++++++
.../test/AccessCandidatesCheckerHdfsTest.java | 12 ++++----
.../AccessCandidatesCheckerKerberizedHdfsTest.java | 2 +-
.../org/apache/uniffle/test/AccessClusterTest.java | 10 +++----
.../uniffle/test/CoordinatorGrpcServerTest.java | 2 +-
.../apache/uniffle/test/CoordinatorGrpcTest.java | 2 +-
.../apache/uniffle/test/IntegrationTestBase.java | 2 +-
.../org/apache/uniffle/test/AutoAccessTest.java | 4 +--
.../test/SparkSQLWithDelegationShuffleManager.java | 4 +--
...arkSQLWithDelegationShuffleManagerFallback.java | 4 +--
.../ContinuousSelectPartitionStrategyTest.java | 2 +-
64 files changed, 283 insertions(+), 110 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index f9a4a93f..4c7b3665 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -28,6 +28,10 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
public class AccessManager {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 73d21683..ecc78c0d 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -38,12 +38,19 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import
org.apache.uniffle.coordinator.strategy.storage.AppBalanceSelectStorageStrategy;
+import
org.apache.uniffle.coordinator.strategy.storage.LowestIOSampleCostSelectStorageStrategy;
+import org.apache.uniffle.coordinator.strategy.storage.RankValue;
+import org.apache.uniffle.coordinator.strategy.storage.SelectStorageStrategy;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
public class ApplicationManager {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationManager.class);
// TODO: Add anomaly detection for other storage
- public static final List<String> REMOTE_PATH_SCHEMA = Arrays.asList("hdfs");
+ private static final List<String> REMOTE_PATH_SCHEMA = Arrays.asList("hdfs");
private final long expired;
private final StrategyName storageStrategy;
private final SelectStorageStrategy selectStorageStrategy;
@@ -224,7 +231,7 @@ public class ApplicationManager {
}
@VisibleForTesting
- protected Map<String, RankValue> getRemoteStoragePathRankValue() {
+ public Map<String, RankValue> getRemoteStoragePathRankValue() {
return remoteStoragePathRankValue;
}
@@ -244,7 +251,7 @@ public class ApplicationManager {
}
@VisibleForTesting
- protected boolean hasErrorInStatusCheck() {
+ public boolean hasErrorInStatusCheck() {
return hasErrorInStatusCheck;
}
@@ -329,6 +336,10 @@ public class ApplicationManager {
return quotaManager;
}
+ public static List<String> getPathSchema() {
+ return REMOTE_PATH_SCHEMA;
+ }
+
public enum StrategyName {
APP_BALANCE,
IO_SAMPLE
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index c696dc83..bea53018 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -25,9 +25,11 @@ import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;
+import
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
+import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
-import static
org.apache.uniffle.coordinator.AssignmentStrategyFactory.StrategyName.PARTITION_BALANCE;
+import static
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory.StrategyName.PARTITION_BALANCE;
/**
* Configuration for Coordinator Service and rss-cluster, including service
port,
@@ -77,8 +79,8 @@ public class CoordinatorConf extends RssBaseConf {
.key("rss.coordinator.access.checkers")
.stringType()
.asList()
- .defaultValues("org.apache.uniffle.coordinator.AccessClusterLoadChecker",
- "org.apache.uniffle.coordinator.AccessQuotaChecker")
+
.defaultValues("org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker",
+ "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker")
.withDescription("Access checkers");
public static final ConfigOption<Integer>
COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions
.key("rss.coordinator.access.candidates.updateIntervalSec")
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 75e3dce7..415f31e2 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -33,6 +33,10 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
import org.apache.uniffle.proto.RssProtos.AccessClusterRequest;
import org.apache.uniffle.proto.RssProtos.AccessClusterResponse;
@@ -67,7 +71,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
private final CoordinatorServer coordinatorServer;
- CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
+ public CoordinatorGrpcService(CoordinatorServer coordinatorServer) {
this.coordinatorServer = coordinatorServer;
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 4314d625..f93c4335 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -31,6 +31,10 @@ import org.apache.uniffle.common.security.SecurityConfig;
import org.apache.uniffle.common.security.SecurityContextFactory;
import org.apache.uniffle.common.web.CommonMetricsServlet;
import org.apache.uniffle.common.web.JettyServer;
+import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
+import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index c25ed36d..9f18c330 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -144,7 +144,7 @@ public class ServerNode implements Comparable<ServerNode> {
return false;
}
- long getTotalMemory() {
+ public long getTotalMemory() {
return availableMemory + usedMemory;
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 0ba266cf..34612e98 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
public class SimpleClusterManager implements ClusterManager {
@@ -226,7 +227,7 @@ public class SimpleClusterManager implements ClusterManager
{
}
@VisibleForTesting
- void clear() {
+ public void clear() {
servers.clear();
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessCheckResult.java
similarity index 96%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessCheckResult.java
index 3e1d8bc8..4cb28abc 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCheckResult.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessCheckResult.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access;
public class AccessCheckResult {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessInfo.java
similarity index 97%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessInfo.java
index f07bc2e7..3c4e494a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/AccessInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access;
import java.util.Collections;
import java.util.Map;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAccessChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java
similarity index 90%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAccessChecker.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java
index c6ce1c96..677890c5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAccessChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access.checker;
+
+import org.apache.uniffle.coordinator.AccessManager;
/**
* Abstract class for checking the access info from the client-side.
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
similarity index 94%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
index f6a4219a..bbc8f382 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access.checker;
import java.nio.charset.StandardCharsets;
import java.util.Set;
@@ -40,6 +40,11 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
/**
* AccessCandidatesChecker maintain a list of candidate access id and update
it periodically,
@@ -82,6 +87,7 @@ public class AccessCandidatesChecker extends
AbstractAccessChecker {
this::updateAccessCandidates, 0, updateIntervalS, TimeUnit.SECONDS);
}
+ @Override
public AccessCheckResult check(AccessInfo accessInfo) {
String accessId = accessInfo.getAccessId().trim();
if (!candidates.get().contains(accessId)) {
@@ -94,6 +100,7 @@ public class AccessCandidatesChecker extends
AbstractAccessChecker {
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}
+ @Override
public void close() {
if (updateAccessCandidatesSES != null) {
updateAccessCandidatesSES.shutdownNow();
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java
similarity index 86%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessChecker.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java
index f703d33e..71a380a2 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java
@@ -15,10 +15,13 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access.checker;
import java.io.Closeable;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+
/**
* Interface for checking the access info from the client-side.
*/
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
similarity index 90%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index f460a985..773e68a8 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access.checker;
import java.util.List;
import java.util.Set;
@@ -25,6 +25,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static
org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
@@ -54,6 +61,7 @@ public class AccessClusterLoadChecker extends
AbstractAccessChecker {
this.defaultRequiredShuffleServerNumber =
conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
}
+ @Override
public AccessCheckResult check(AccessInfo accessInfo) {
Set<String> tags = accessInfo.getTags();
List<ServerNode> servers = clusterManager.getServerList(tags);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessQuotaChecker.java
similarity index 86%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessQuotaChecker.java
index 78923ab8..c8036b54 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessQuotaChecker.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access.checker;
import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;
@@ -25,6 +25,12 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.QuotaManager;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
/**
* This checker limits the number of apps that different users can submit.
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
similarity index 97%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcMetrics.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
index e731b9db..eb141a63 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.metric;
import org.apache.uniffle.common.metrics.GRPCMetrics;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
similarity index 81%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index 10bc8b2f..02bfd85c 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.metric;
import java.util.Map;
@@ -42,16 +42,16 @@ public class CoordinatorMetrics {
private static final String TOTAL_QUOTA_DENIED_REQUEST =
"total_quota_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX =
"remote_storage_in_used_";
- static Gauge gaugeTotalServerNum;
- static Gauge gaugeExcludeServerNum;
- static Gauge gaugeUnhealthyServerNum;
- static Gauge gaugeRunningAppNum;
- static Counter counterTotalAppNum;
- static Counter counterTotalAccessRequest;
- static Counter counterTotalCandidatesDeniedRequest;
- static Counter counterTotalQuotaDeniedRequest;
- static Counter counterTotalLoadDeniedRequest;
- static final Map<String, Gauge> gaugeInUsedRemoteStorage =
Maps.newConcurrentMap();
+ public static Gauge gaugeTotalServerNum;
+ public static Gauge gaugeExcludeServerNum;
+ public static Gauge gaugeUnhealthyServerNum;
+ public static Gauge gaugeRunningAppNum;
+ public static Counter counterTotalAppNum;
+ public static Counter counterTotalAccessRequest;
+ public static Counter counterTotalCandidatesDeniedRequest;
+ public static Counter counterTotalQuotaDeniedRequest;
+ public static Counter counterTotalLoadDeniedRequest;
+ public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE =
Maps.newConcurrentMap();
private static MetricsManager metricsManager;
private static boolean isRegister = false;
@@ -72,7 +72,7 @@ public class CoordinatorMetrics {
@VisibleForTesting
public static void clear() {
isRegister = false;
- gaugeInUsedRemoteStorage.clear();
+ GAUGE_USED_REMOTE_STORAGE.clear();
CollectorRegistry.defaultRegistry.clear();
}
@@ -82,17 +82,17 @@ public class CoordinatorMetrics {
public static void addDynamicGaugeForRemoteStorage(String storageHost) {
if (!StringUtils.isEmpty(storageHost)) {
- if (!gaugeInUsedRemoteStorage.containsKey(storageHost)) {
+ if (!GAUGE_USED_REMOTE_STORAGE.containsKey(storageHost)) {
String metricName = REMOTE_STORAGE_IN_USED_PREFIX +
RssUtils.getMetricNameForHostName(storageHost);
- gaugeInUsedRemoteStorage.putIfAbsent(storageHost,
+ GAUGE_USED_REMOTE_STORAGE.putIfAbsent(storageHost,
metricsManager.addGauge(metricName));
}
}
}
public static void updateDynamicGaugeForRemoteStorage(String storageHost,
double value) {
- if (gaugeInUsedRemoteStorage.containsKey(storageHost)) {
- gaugeInUsedRemoteStorage.get(storageHost).set(value);
+ if (GAUGE_USED_REMOTE_STORAGE.containsKey(storageHost)) {
+ GAUGE_USED_REMOTE_STORAGE.get(storageHost).set(value);
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
similarity index 82%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
index e795a63d..be6d371b 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AbstractAssignmentStrategy.java
@@ -15,12 +15,21 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.List;
import java.util.SortedMap;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import
org.apache.uniffle.coordinator.strategy.host.BasicHostAssignmentStrategy;
+import org.apache.uniffle.coordinator.strategy.host.HostAssignmentStrategy;
+import
org.apache.uniffle.coordinator.strategy.host.MustDiffHostAssignmentStrategy;
+import
org.apache.uniffle.coordinator.strategy.host.PreferDiffHostAssignmentStrategy;
+import
org.apache.uniffle.coordinator.strategy.partition.ContinuousSelectPartitionStrategy;
+import
org.apache.uniffle.coordinator.strategy.partition.RoundSelectPartitionStrategy;
+import
org.apache.uniffle.coordinator.strategy.partition.SelectPartitionStrategy;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_HOST_STRATEGY;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
similarity index 94%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
index 88c0027a..017fd8f6 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.Set;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategyFactory.java
similarity index 90%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategyFactory.java
index 0ae39314..9a18afae 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AssignmentStrategyFactory.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/AssignmentStrategyFactory.java
@@ -15,7 +15,10 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
+
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
public class AssignmentStrategyFactory {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
similarity index 93%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
index d56cef56..47557e13 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.Collections;
import java.util.List;
@@ -26,6 +26,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
public class BasicAssignmentStrategy extends AbstractAssignmentStrategy {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
similarity index 96%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 088ce5bc..3bc6b14f 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.Collection;
import java.util.Comparator;
@@ -30,6 +30,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
/**
* PartitionBalanceAssignmentStrategy will consider allocating partitions from
two aspects
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionRangeAssignment.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignment.java
similarity index 96%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionRangeAssignment.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignment.java
index c6eee79e..5dc4e125 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PartitionRangeAssignment.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignment.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.ArrayList;
import java.util.List;
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import com.google.common.base.Objects;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.proto.RssProtos;
public class PartitionRangeAssignment {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicHostAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/BasicHostAssignmentStrategy.java
similarity index 90%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/BasicHostAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/BasicHostAssignmentStrategy.java
index a8bfc0bf..e52cf2cf 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/BasicHostAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/BasicHostAssignmentStrategy.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.host;
import java.util.List;
+import org.apache.uniffle.coordinator.ServerNode;
+
public class BasicHostAssignmentStrategy implements HostAssignmentStrategy {
@Override
public List<ServerNode> assign(List<ServerNode> allNodes, int expectNum) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/HostAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/HostAssignmentStrategy.java
similarity index 89%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/HostAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/HostAssignmentStrategy.java
index b8c9552a..9c42402c 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/HostAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/HostAssignmentStrategy.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.host;
import java.util.List;
+import org.apache.uniffle.coordinator.ServerNode;
+
public interface HostAssignmentStrategy {
List<ServerNode> assign(List<ServerNode> allNodes, int expectNum);
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/MustDiffHostAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/MustDiffHostAssignmentStrategy.java
similarity index 93%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/MustDiffHostAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/MustDiffHostAssignmentStrategy.java
index d4c05830..6f6b55de 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/MustDiffHostAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/MustDiffHostAssignmentStrategy.java
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.host;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.uniffle.coordinator.ServerNode;
+
public class MustDiffHostAssignmentStrategy implements HostAssignmentStrategy {
@Override
public List<ServerNode> assign(List<ServerNode> allNodes, int expectNum) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PreferDiffHostAssignmentStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/PreferDiffHostAssignmentStrategy.java
similarity index 94%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/PreferDiffHostAssignmentStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/PreferDiffHostAssignmentStrategy.java
index c2ec64a0..ae07b67f 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/PreferDiffHostAssignmentStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/host/PreferDiffHostAssignmentStrategy.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.host;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.uniffle.coordinator.ServerNode;
+
public class PreferDiffHostAssignmentStrategy implements
HostAssignmentStrategy {
private MustDiffHostAssignmentStrategy strategy;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategy.java
similarity index 92%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategy.java
index 9a0f9bbb..ad85e142 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.partition;
import java.util.List;
import java.util.SortedMap;
@@ -24,6 +24,8 @@ import java.util.TreeMap;
import com.google.common.collect.Lists;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
public class ContinuousSelectPartitionStrategy implements
SelectPartitionStrategy {
@Override
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/RoundSelectPartitionStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/RoundSelectPartitionStrategy.java
similarity index 91%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/RoundSelectPartitionStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/RoundSelectPartitionStrategy.java
index affaa641..a4a43d38 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/RoundSelectPartitionStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/RoundSelectPartitionStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.partition;
import java.util.List;
import java.util.SortedMap;
@@ -24,6 +24,8 @@ import java.util.TreeMap;
import com.google.common.collect.Lists;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
public class RoundSelectPartitionStrategy implements SelectPartitionStrategy {
@Override
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectPartitionStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/SelectPartitionStrategy.java
similarity index 91%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/SelectPartitionStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/SelectPartitionStrategy.java
index e785c0d3..aa801ee5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectPartitionStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/partition/SelectPartitionStrategy.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.partition;
import java.util.List;
import java.util.SortedMap;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
public interface SelectPartitionStrategy {
/**
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
similarity index 96%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index 9025ddb4..a6564ce2 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AbstractSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import java.io.IOException;
import java.util.Map;
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.coordinator.CoordinatorConf;
/**
* This is a simple implementation class, which provides some methods to check
whether the path is normal
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
similarity index 95%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index 4f6f6535..94e421cf 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import java.util.Comparator;
import java.util.List;
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
/**
* AppBalanceSelectStorageStrategy will consider the number of apps allocated
on each remote path is balanced.
@@ -83,7 +85,7 @@ public class AppBalanceSelectStorageStrategy extends
AbstractSelectStorageStrate
uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
if (remoteStoragePathRankValue.size() > 1) {
for (Map.Entry<String, RankValue> uri : uris) {
- if
(uri.getKey().startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) {
+ if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
rankValue.setHealthy(new AtomicBoolean(true));
Path remotePath = new Path(uri.getKey());
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
similarity index 95%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index 551b8f4c..f3d9d61f 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import java.util.List;
import java.util.Map;
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
/**
* LowestIOSampleCostSelectStorageStrategy considers that when allocating apps
to different remote paths,
@@ -98,7 +100,7 @@ public class LowestIOSampleCostSelectStorageStrategy extends
AbstractSelectStora
uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
if (remoteStoragePathRankValue.size() > 1) {
for (Map.Entry<String, RankValue> uri : uris) {
- if
(uri.getKey().startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) {
+ if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
Path remotePath = new Path(uri.getKey());
String rssTest = uri.getKey() + "/rssTest";
Path testPath = new Path(rssTest);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/RankValue.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/RankValue.java
similarity index 97%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/RankValue.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/RankValue.java
index 349698b1..66e290c8 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/RankValue.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/RankValue.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/SelectStorageStrategy.java
similarity index 94%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/SelectStorageStrategy.java
index 52f7073c..f37b0b2c 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/SelectStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import org.apache.uniffle.common.RemoteStorageInfo;
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
similarity index 98%
rename from
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
rename to
coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index 3db66278..c1247e30 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.util;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.util.Constants;
+import
org.apache.uniffle.coordinator.strategy.assignment.PartitionRangeAssignment;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.GetShuffleAssignmentsResponse;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
index 64150d42..1a1cd6bf 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
index 11952d59..2622c0ed 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java
@@ -38,6 +38,8 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import
org.apache.uniffle.coordinator.strategy.storage.LowestIOSampleCostSelectStorageStrategy;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index 3ead278e..52679692 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -98,7 +98,7 @@ public class QuotaManagerTest {
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
quotaFile);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
-
Lists.newArrayList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+
Lists.newArrayList("org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
ApplicationManager applicationManager = new ApplicationManager(conf);
Thread.sleep(500);
// it didn't detectUserResource because
`org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index df73bfa2..1fd84058 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -36,6 +36,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
similarity index 85%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
index 8870d4fa..df1ce96e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/access/AccessManagerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.access;
import java.util.Collections;
import java.util.Random;
@@ -27,6 +27,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.access.checker.AbstractAccessChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -56,7 +61,7 @@ public class AccessManagerTest {
assertTrue(e.getMessage().startsWith(expectedMessage));
}
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
+
"com.Dummy,org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessChecker");
try {
new AccessManager(conf, null, applicationManager.getQuotaManager(), new
Configuration());
} catch (RuntimeException e) {
@@ -74,15 +79,15 @@ public class AccessManagerTest {
accessManager.close();
// test mock checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
+
"org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
accessManager = new AccessManager(conf, null,
applicationManager.getQuotaManager(), new Configuration());
assertEquals(1, accessManager.getAccessCheckers().size());
assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock1")).isSuccess());
assertTrue(accessManager.handleAccessRequest(new
AccessInfo("mock2")).isSuccess());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
-
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
- +
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
+
"org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
+ +
"org.apache.uniffle.coordinator.access.AccessManagerTest$MockAccessCheckerAlwaysFalse");
accessManager = new AccessManager(conf, null,
applicationManager.getQuotaManager(), new Configuration());
assertEquals(2, accessManager.getAccessCheckers().size());
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
similarity index 88%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
index c8e80883..78ef164d 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessCandidatesCheckerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.checker;
import java.io.File;
import java.io.FileWriter;
@@ -30,6 +30,13 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -57,7 +64,7 @@ public class AccessCandidatesCheckerTest {
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
tempDir.toURI().toString());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker");
final ApplicationManager applicationManager = new ApplicationManager(conf);
// file load checking at startup
Exception expectedException = null;
@@ -68,7 +75,7 @@ public class AccessCandidatesCheckerTest {
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.AccessCandidatesChecker.<init>()"));
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
cfgFile.toURI().toString());
expectedException = null;
try {
@@ -78,7 +85,7 @@ public class AccessCandidatesCheckerTest {
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.AccessCandidatesChecker.<init>()"));
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
// load the config at the beginning
FileWriter fileWriter = new FileWriter(cfgFile);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
similarity index 89%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
index b916d575..7fa76391 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.checker;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,6 +30,16 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
import static
org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE;
@@ -83,7 +93,8 @@ public class AccessClusterLoadCheckerTest {
when(clusterManager.getServerList(any())).thenReturn(nodes);
CoordinatorConf conf = new CoordinatorConf();
- conf.set(COORDINATOR_ACCESS_CHECKERS,
Arrays.asList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+ conf.set(COORDINATOR_ACCESS_CHECKERS,
+
Arrays.asList("org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
ApplicationManager applicationManager = new ApplicationManager(conf);
@@ -150,7 +161,7 @@ public class AccessClusterLoadCheckerTest {
getClass().getClassLoader().getResource("coordinator.conf")).getFile();
CoordinatorConf conf = new CoordinatorConf(filePath);
conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
ApplicationManager applicationManager = new ApplicationManager(conf);
AccessManager accessManager = new AccessManager(conf, clusterManager,
applicationManager.getQuotaManager(), new Configuration());
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
similarity index 85%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
index 60dd4f7f..2c1eb203 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.checker;
import java.util.Arrays;
import java.util.Collections;
@@ -30,6 +30,17 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.coordinator.AccessManager;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.ClusterManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker;
+import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
import static
org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM;
@@ -81,7 +92,7 @@ public class AccessQuotaCheckerTest {
CoordinatorConf conf = new CoordinatorConf();
conf.set(COORDINATOR_ACCESS_CHECKERS,
-
Collections.singletonList("org.apache.uniffle.coordinator.AccessQuotaChecker"));
+
Collections.singletonList("org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker"));
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
ApplicationManager applicationManager = new ApplicationManager(conf);
AccessManager accessManager = new AccessManager(conf, clusterManager,
@@ -121,8 +132,8 @@ public class AccessQuotaCheckerTest {
*/
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 10);
conf.set(COORDINATOR_ACCESS_CHECKERS,
- Arrays.asList("org.apache.uniffle.coordinator.AccessQuotaChecker",
- "org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+
Arrays.asList("org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker",
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
applicationManager = new ApplicationManager(conf);
accessManager = new AccessManager(conf, clusterManager,
applicationManager.getQuotaManager(), new Configuration());
accessQuotaChecker = (AccessQuotaChecker)
accessManager.getAccessCheckers().get(0);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
similarity index 96%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index b1bd1e0e..0fcb3d04 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorMetricsTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.metric;
import java.io.File;
import java.io.FileWriter;
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.TestUtils;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
similarity index 97%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
index 6a2d3738..635dce00 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/BasicAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.io.IOException;
import java.util.Collection;
@@ -34,6 +34,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
similarity index 98%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
index 43b2e0d7..e7b2e525 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionBalanceAssignmentStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.io.IOException;
import java.util.Collection;
@@ -35,6 +35,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
similarity index 95%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
index 4950eaa7..4d3dc215 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeAssignmentTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import java.util.Collections;
import java.util.List;
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.proto.RssProtos;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeTest.java
similarity index 95%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeTest.java
index 6a465b99..63970e50 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/PartitionRangeTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.assignment;
import org.junit.jupiter.api.Test;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
similarity index 96%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategyTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
index fa21352c..26329d7d 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ContinuousSelectPartitionStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.partition;
import java.util.Collection;
import java.util.Comparator;
@@ -30,12 +30,12 @@ import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.coordinator.ServerNode;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ContinuousSelectPartitionStrategyTest {
- private int shuffleNodesMax = 5;
- private Set<String> tags = Sets.newHashSet("test");
+ private final Set<String> tags = Sets.newHashSet("test");
@Test
public void test() throws Exception {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
similarity index 93%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index eefda99d..9a0b0b33 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
@@ -24,6 +24,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -64,9 +67,9 @@ public class AppBalanceSelectStorageStrategyTest {
assertEquals(0,
applicationManager.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get());
assertEquals(0,
applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
String storageHost1 = "path1";
- assertEquals(0.0,
CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+ assertEquals(0.0,
CoordinatorMetrics.GAUGE_USED_REMOTE_STORAGE.get(storageHost1).get(), 0.5);
String storageHost2 = "path2";
- assertEquals(0.0,
CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+ assertEquals(0.0,
CoordinatorMetrics.GAUGE_USED_REMOTE_STORAGE.get(storageHost2).get(), 0.5);
// init readWriteRankScheduler
Thread.sleep(2000);
// do inc for remotePath1 to make sure pick storage will be remotePath2 in
next call
@@ -85,7 +88,7 @@ public class AppBalanceSelectStorageStrategyTest {
Thread.sleep(appExpiredTime + 2000);
assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1));
assertEquals(0,
applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get());
- assertEquals(0.0,
CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+ assertEquals(0.0,
CoordinatorMetrics.GAUGE_USED_REMOTE_STORAGE.get(storageHost2).get(), 0.5);
// refresh app1, got remotePath2, then remove remotePath2,
// it should be existed in counter until it expired
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
similarity index 95%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 7273318f..dd62fd35 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.strategy.storage;
import java.io.File;
@@ -31,6 +31,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.ApplicationManager;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import static
org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -97,9 +100,9 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
assertEquals(0,
applicationManager.getRemoteStoragePathRankValue().get(remoteStorage2).getCostTime().get());
String storageHost1 = "p1";
- assertEquals(0.0,
CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5);
+ assertEquals(0.0,
CoordinatorMetrics.GAUGE_USED_REMOTE_STORAGE.get(storageHost1).get(), 0.5);
String storageHost2 = "p2";
- assertEquals(0.0,
CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5);
+ assertEquals(0.0,
CoordinatorMetrics.GAUGE_USED_REMOTE_STORAGE.get(storageHost2).get(), 0.5);
// compare with two remote path
applicationManager.incRemoteStorageCounter(remoteStorage1);
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorUtilsTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/util/CoordinatorUtilsTest.java
similarity index 99%
rename from
coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorUtilsTest.java
rename to
coordinator/src/test/java/org/apache/uniffle/coordinator/util/CoordinatorUtilsTest.java
index f862cf79..f658ec3e 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorUtilsTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/util/CoordinatorUtilsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.coordinator.util;
import java.util.List;
import java.util.Map;
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index 90d20aea..da02a579 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -87,7 +87,7 @@ This document will introduce how to deploy Uniffle
coordinators.
|rss.coordinator.dynamicClientConf.path|-|The path of configuration file which
have default conf for rss client|
|rss.coordinator.exclude.nodes.file.path|-|The path of configuration file
which have exclude nodes|
|rss.coordinator.exclude.nodes.check.interval.ms|60000|Update interval (ms)
for exclude nodes|
-|rss.coordinator.access.checkers|org.apache.uniffle.coordinator.AccessClusterLoadChecker|The
access checkers will be used when the spark client use the
DelegationShuffleManager, which will decide whether to use rss according to the
result of the specified access checkers|
+|rss.coordinator.access.checkers|org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker|The
access checkers will be used when the spark client use the
DelegationShuffleManager, which will decide whether to use rss according to the
result of the specified access checkers|
|rss.coordinator.access.loadChecker.memory.percentage|15.0|The minimal
percentage of available memory percentage of a server|
|rss.coordinator.dynamicClientConf.enabled|false|whether to enable dynamic
client conf, which will be fetched by spark client|
|rss.coordinator.dynamicClientConf.path|-|The dynamic client conf of this
cluster and can be stored in HDFS or local|
@@ -102,6 +102,9 @@ This document will introduce how to deploy Uniffle
coordinators.
|rss.coordinator.startup-silent-period.enabled|false|Enable the
startup-silent-period to reject the assignment requests for avoiding partial
assignments. To avoid service interruption, this mechanism is disabled by
default. Especially it's recommended to use in coordinator HA mode when
restarting single coordinator.|
|rss.coordinator.startup-silent-period.duration|20000|The waiting duration(ms)
when conf of rss.coordinator.startup-silent-period.enabled is enabled.|
|rss.coordinator.select.partition.strategy|ROUND|There are two strategies for
selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate
partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive
partitions to ShuffleServer, this feature can improve performance in AQE
scenarios.|
+|rss.coordinator.quota.update.interval|60000|Update interval for the default
number of submitted apps per user.|
+|rss.coordinator.quota.default.path|-|A configuration file for the number of
apps for a user-defined user.|
+|rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
### AccessClusterLoadChecker settings
|Property Name|Default| Description|
diff --git a/docs/uniffle-migration-guide.md b/docs/uniffle-migration-guide.md
new file mode 100644
index 00000000..6d8edbdd
--- /dev/null
+++ b/docs/uniffle-migration-guide.md
@@ -0,0 +1,25 @@
+---
+layout: page
+displayTitle: Uniffle Migration Guide
+title: Uniffle Migration Guide
+description: Uniffle Migration Guide
+license: |
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+---
+
+# Upgrading from Coordinator 0.6 to 0.7
+
++ Since we have reconstructed the class file under coordinator, for the
`rss.coordinator.access.checkers` parameter, the original value
`org.apache.unifle.coordinator.AccessClusterLoadChecker` has been replaced with
`org.apache.unifle.coordinator.access.checker.AccessClusterLoadChecker`,
`org.apache.unifle.coordinator.AccessCandidatesChecker` has been replaced with
`org.apache.unifle.coordinator.access.checker.AccessCandidatesChecker`, In
addition, `org.apache.unifle.coordinator.access.c [...]
\ No newline at end of file
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index 71d3c9ec..9c483192 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -29,12 +29,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.AccessCandidatesChecker;
-import org.apache.uniffle.coordinator.AccessInfo;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.storage.HdfsTestBase;
import static java.lang.Thread.sleep;
@@ -70,7 +70,7 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC, 1);
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
clusterPrefix);
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker");
ApplicationManager applicationManager = new ApplicationManager(conf);
// file load checking at startup
Exception expectedException = null;
@@ -81,7 +81,7 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.AccessCandidatesChecker.<init>()"));
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidatesFile);
expectedException = null;
try {
@@ -91,7 +91,7 @@ public class AccessCandidatesCheckerHdfsTest extends
HdfsTestBase {
}
assertNotNull(expectedException);
assertTrue(expectedException.getMessage().contains(
- "NoSuchMethodException:
org.apache.uniffle.coordinator.AccessCandidatesChecker.<init>()"));
+ "NoSuchMethodException:
org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker.<init>()"));
Path path = new Path(candidatesFile);
FSDataOutputStream out = fs.create(path);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
index 5b5c75f6..d997e8c4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.KerberizedHdfsBase;
-import org.apache.uniffle.coordinator.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
public class AccessCandidatesCheckerKerberizedHdfsTest extends
KerberizedHdfsBase {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index d65bf00b..7bf1b71e 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -39,11 +39,11 @@ import
org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.util.Constants;
-import org.apache.uniffle.coordinator.AccessCheckResult;
-import org.apache.uniffle.coordinator.AccessChecker;
-import org.apache.uniffle.coordinator.AccessInfo;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.access.AccessCheckResult;
+import org.apache.uniffle.coordinator.access.AccessInfo;
+import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -126,8 +126,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
coordinatorConf.setString("rss.coordinator.access.candidates.path",
cfgFile.getAbsolutePath());
coordinatorConf.setString(
"rss.coordinator.access.checkers",
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
- + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+ +
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index a1858b84..4b5016df 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -26,7 +26,7 @@ import
org.apache.uniffle.client.request.RssApplicationInfoRequest;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.rpc.GrpcServer;
-import org.apache.uniffle.coordinator.CoordinatorGrpcMetrics;
+import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
import org.apache.uniffle.proto.RssProtos;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index 35a2ee0d..7573c616 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -38,9 +38,9 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.SimpleClusterManager;
+import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.GetShuffleAssignmentsResponse;
import org.apache.uniffle.proto.RssProtos.PartitionRangeAssignment;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index c8867a98..77eb25f6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -29,8 +29,8 @@ import org.junit.jupiter.api.AfterAll;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.coordinator.CoordinatorMetrics;
import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.server.MockedShuffleServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
index c6ad3fd5..a75f5a2c 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
@@ -83,8 +83,8 @@ public class AutoAccessTest extends IntegrationTestBase {
coordinatorConf.setString("rss.coordinator.access.candidates.path",
candidatesFile);
coordinatorConf.setString(
"rss.coordinator.access.checkers",
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
- + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+ +
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
index 45d0e774..48ef0cb5 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
@@ -43,8 +43,8 @@ public class SparkSQLWithDelegationShuffleManager extends
SparkSQLTest {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
- + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+ +
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
1);
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
index 44120958..f4edba13 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
@@ -43,8 +43,8 @@ public class SparkSQLWithDelegationShuffleManagerFallback
extends SparkSQLTest {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
- "org.apache.uniffle.coordinator.AccessCandidatesChecker,"
- + "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+ +
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH,
candidates);
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
coordinatorConf.set(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
1);
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
index c7c537a7..09727db5 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
@@ -38,8 +38,8 @@ import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.apache.uniffle.coordinator.AbstractAssignmentStrategy;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.MockedShuffleServerGrpcService;
import org.apache.uniffle.server.ShuffleServer;