advancedxy commented on code in PR #1137:
URL:
https://github.com/apache/incubator-uniffle/pull/1137#discussion_r1293484922
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java:
##########
@@ -66,8 +66,42 @@ public PartitionRangeAssignment assign(
return new PartitionRangeAssignment(assignments);
}
- private List<ServerNode> getRequiredServers(Set<String> requiredTags, int
expectedNum) {
- List<ServerNode> servers = clusterManager.getServerList(requiredTags);
+ @Override
+ public PartitionRangeAssignment assign(
+ int totalPartitionNum,
+ int partitionNumPerRange,
+ int replica,
+ Set<String> requiredTags,
+ int requiredShuffleServerNumber,
+ int estimateTaskConcurrency,
+ Set<String> excludeServerNodes) {
+ int shuffleNodesMax = clusterManager.getShuffleNodesMax();
+ int expectedShuffleNodesNum = shuffleNodesMax;
+ if (requiredShuffleServerNumber < shuffleNodesMax &&
requiredShuffleServerNumber > 0) {
+ expectedShuffleNodesNum = requiredShuffleServerNumber;
+ }
+ List<ServerNode> servers =
+ getRequiredServers(requiredTags, expectedShuffleNodesNum,
excludeServerNodes);
+ if (servers.isEmpty() || servers.size() < replica) {
+ return new PartitionRangeAssignment(null);
+ }
+
+ SortedMap<PartitionRange, List<ServerNode>> assignments =
+ getPartitionAssignment(
+ totalPartitionNum, partitionNumPerRange, replica, servers,
estimateTaskConcurrency);
+
+ return new PartitionRangeAssignment(assignments);
+ }
Review Comment:
There's a lot code duplication.
Method should simply call the variant version only
```java
public PartitionRangeAssignment assign(
int totalPartitionNum,
int partitionNumPerRange,
int replica,
Set<String> requiredTags,
int requiredShuffleServerNumber,
int estimateTaskConcurrency) {
return assign(...., excludedNodes = null); // or Lists.of()
}
```
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java:
##########
@@ -59,6 +59,93 @@ public PartitionBalanceAssignmentStrategy(ClusterManager
clusterManager, Coordin
this.clusterManager = clusterManager;
}
+ @Override
+ public PartitionRangeAssignment assign(
Review Comment:
ditto
##########
integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java:
##########
@@ -185,4 +191,35 @@ public void testReconfigureNodeMax() throws Exception {
}
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
}
+
+ @Test
+ public void testGetReShuffleAssignments() {
+ ShuffleWriteClientImpl shuffleWriteClient =
+ new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1, 1, 1,
1, true, 1, 1, 10, 10);
+ shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+ Set<String> excludeServer = Sets.newConcurrentHashSet();
+ List<ShuffleServer> excludeShuffleServer =
+ shuffleServers.stream().limit(3).collect(Collectors.toList());
+ excludeShuffleServer.stream().map(ss ->
ss.getId()).peek(excludeServer::add);
+ ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+ shuffleWriteClient.getReShuffleAssignments(
+ "app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1, excludeServer);
+ List<ShuffleServerInfo> resultShuffle = Lists.newArrayList();
+ for (List<ShuffleServerInfo> ssis :
shuffleAssignmentsInfo.getPartitionToServers().values()) {
+ resultShuffle.addAll(ssis);
+ }
+ List<String> collect =
+ resultShuffle.stream().map(ss ->
ss.getId()).collect(Collectors.toList());
+ assertEquals(false, isContainsOne(excludeServer,
Sets.newConcurrentHashSet(collect)));
+ }
+
+ private static boolean isContainsOne(Set<String> oneList, Set<String>
otherList) {
+ for (String one : otherList) {
+ boolean contains = oneList.contains(one);
+ if (contains) {
+ return true;
+ }
+ }
+ return false;
Review Comment:
use `retainAll` instead?
https://docs.oracle.com/javase/6/docs/api/java/util/Set.html#retainAll%28java.util.Collection%29
##########
client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java:
##########
@@ -78,6 +78,16 @@ ShuffleAssignmentsInfo getShuffleAssignments(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency);
+ ShuffleAssignmentsInfo getReShuffleAssignments(
Review Comment:
you could add a default implementation which throws an unsupported exception.
In that way, you don't have to provide impls for MR and tez.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -256,6 +256,32 @@ public List<ServerNode> getServerList(Set<String>
requiredTags) {
return availableNodes;
}
+ @Override
+ public List<ServerNode> getServerList(Set<String> requiredTags, Set<String>
excludeServerNodes) {
+ List<ServerNode> availableNodes = Lists.newArrayList();
+ for (ServerNode node : servers.values()) {
+ if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+ continue;
+ }
+ // If the excludeServerNodes is null, the exclusion operation is not
performed
+ if (excludeServerNodes != null) {
+ if (!excludeServerNodes.contains(node.getId())
+ && !excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
+ } else {
+ if (!excludeNodes.contains(node.getId())
+ && node.getTags().containsAll(requiredTags)
+ && ServerStatus.ACTIVE.equals(node.getStatus())) {
+ availableNodes.add(node);
+ }
+ }
+ }
+ return availableNodes;
Review Comment:
Code duplication.
It would be nice simpler to just call
```java
List<ServerNode> availableNodes = getServerList(requiredTags);
if (Objects.isEmpty(excludeServerNodes) {
return availableNodes;
} else {
return
availableNodes.stream().filter(!excludeServerNodes::contains).toCollector(...)
}
```
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java:
##########
@@ -405,4 +407,71 @@ private ServerNode
toServerNode(ShuffleServerHeartBeatRequest request) {
StorageInfoUtils.fromProto(request.getStorageInfoMap()),
request.getServerId().getNettyPort());
}
+
+ @Override
+ public void getReShuffleAssignments(
Review Comment:
Like commented in other places, this interface might be merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]