advancedxy commented on code in PR #1137:
URL: 
https://github.com/apache/incubator-uniffle/pull/1137#discussion_r1293484922


##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategy.java:
##########
@@ -66,8 +66,42 @@ public PartitionRangeAssignment assign(
     return new PartitionRangeAssignment(assignments);
   }
 
-  private List<ServerNode> getRequiredServers(Set<String> requiredTags, int 
expectedNum) {
-    List<ServerNode> servers = clusterManager.getServerList(requiredTags);
+  @Override
+  public PartitionRangeAssignment assign(
+      int totalPartitionNum,
+      int partitionNumPerRange,
+      int replica,
+      Set<String> requiredTags,
+      int requiredShuffleServerNumber,
+      int estimateTaskConcurrency,
+      Set<String> excludeServerNodes) {
+    int shuffleNodesMax = clusterManager.getShuffleNodesMax();
+    int expectedShuffleNodesNum = shuffleNodesMax;
+    if (requiredShuffleServerNumber < shuffleNodesMax && 
requiredShuffleServerNumber > 0) {
+      expectedShuffleNodesNum = requiredShuffleServerNumber;
+    }
+    List<ServerNode> servers =
+        getRequiredServers(requiredTags, expectedShuffleNodesNum, 
excludeServerNodes);
+    if (servers.isEmpty() || servers.size() < replica) {
+      return new PartitionRangeAssignment(null);
+    }
+
+    SortedMap<PartitionRange, List<ServerNode>> assignments =
+        getPartitionAssignment(
+            totalPartitionNum, partitionNumPerRange, replica, servers, 
estimateTaskConcurrency);
+
+    return new PartitionRangeAssignment(assignments);
+  }

Review Comment:
   There's a lot code duplication.
   
   Method should simply call the variant version only
    ```java
     public PartitionRangeAssignment assign(
         int totalPartitionNum,
         int partitionNumPerRange,
         int replica,
         Set<String> requiredTags,
         int requiredShuffleServerNumber,
         int estimateTaskConcurrency) {
      return assign(...., excludedNodes = null); // or Lists.of()
     }
   ```



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java:
##########
@@ -59,6 +59,93 @@ public PartitionBalanceAssignmentStrategy(ClusterManager 
clusterManager, Coordin
     this.clusterManager = clusterManager;
   }
 
+  @Override
+  public PartitionRangeAssignment assign(

Review Comment:
   ditto



##########
integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java:
##########
@@ -185,4 +191,35 @@ public void testReconfigureNodeMax() throws Exception {
     }
     Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
   }
+
+  @Test
+  public void testGetReShuffleAssignments() {
+    ShuffleWriteClientImpl shuffleWriteClient =
+        new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1, 1, 1, 
1, true, 1, 1, 10, 10);
+    shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+    Set<String> excludeServer = Sets.newConcurrentHashSet();
+    List<ShuffleServer> excludeShuffleServer =
+        shuffleServers.stream().limit(3).collect(Collectors.toList());
+    excludeShuffleServer.stream().map(ss -> 
ss.getId()).peek(excludeServer::add);
+    ShuffleAssignmentsInfo shuffleAssignmentsInfo =
+        shuffleWriteClient.getReShuffleAssignments(
+            "app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1, excludeServer);
+    List<ShuffleServerInfo> resultShuffle = Lists.newArrayList();
+    for (List<ShuffleServerInfo> ssis : 
shuffleAssignmentsInfo.getPartitionToServers().values()) {
+      resultShuffle.addAll(ssis);
+    }
+    List<String> collect =
+        resultShuffle.stream().map(ss -> 
ss.getId()).collect(Collectors.toList());
+    assertEquals(false, isContainsOne(excludeServer, 
Sets.newConcurrentHashSet(collect)));
+  }
+
+  private static boolean isContainsOne(Set<String> oneList, Set<String> 
otherList) {
+    for (String one : otherList) {
+      boolean contains = oneList.contains(one);
+      if (contains) {
+        return true;
+      }
+    }
+    return false;

Review Comment:
   use `retainAll` instead? 
https://docs.oracle.com/javase/6/docs/api/java/util/Set.html#retainAll%28java.util.Collection%29
 



##########
client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java:
##########
@@ -78,6 +78,16 @@ ShuffleAssignmentsInfo getShuffleAssignments(
       int assignmentShuffleServerNumber,
       int estimateTaskConcurrency);
 
+  ShuffleAssignmentsInfo getReShuffleAssignments(

Review Comment:
   you could add a default implementation which throws an unsupported exception.
   In that way, you don't have to provide impls for MR and tez.



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java:
##########
@@ -256,6 +256,32 @@ public List<ServerNode> getServerList(Set<String> 
requiredTags) {
     return availableNodes;
   }
 
+  @Override
+  public List<ServerNode> getServerList(Set<String> requiredTags, Set<String> 
excludeServerNodes) {
+    List<ServerNode> availableNodes = Lists.newArrayList();
+    for (ServerNode node : servers.values()) {
+      if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+        continue;
+      }
+      // If the excludeServerNodes is null, the exclusion operation is not 
performed
+      if (excludeServerNodes != null) {
+        if (!excludeServerNodes.contains(node.getId())
+            && !excludeNodes.contains(node.getId())
+            && node.getTags().containsAll(requiredTags)
+            && ServerStatus.ACTIVE.equals(node.getStatus())) {
+          availableNodes.add(node);
+        }
+      } else {
+        if (!excludeNodes.contains(node.getId())
+            && node.getTags().containsAll(requiredTags)
+            && ServerStatus.ACTIVE.equals(node.getStatus())) {
+          availableNodes.add(node);
+        }
+      }
+    }
+    return availableNodes;

Review Comment:
   Code duplication. 
   
   It would be nice simpler to just call 
   ```java
   List<ServerNode> availableNodes = getServerList(requiredTags);
   if (Objects.isEmpty(excludeServerNodes) {
     return availableNodes;
   } else {
     return 
availableNodes.stream().filter(!excludeServerNodes::contains).toCollector(...)
   }
   ```



##########
coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java:
##########
@@ -405,4 +407,71 @@ private ServerNode 
toServerNode(ShuffleServerHeartBeatRequest request) {
         StorageInfoUtils.fromProto(request.getStorageInfoMap()),
         request.getServerId().getNettyPort());
   }
+
+  @Override
+  public void getReShuffleAssignments(

Review Comment:
   Like commented in other places, this interface might be merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to