zuston commented on code in PR #1615:
URL: 
https://github.com/apache/incubator-uniffle/pull/1615#discussion_r1568774511


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java:
##########
@@ -85,33 +126,125 @@ public int getShuffleId() {
     return shuffleId;
   }
 
-  public boolean isExistingFaultyServer(String serverId) {
-    return faultyServerReplacements.containsKey(serverId);
+  @VisibleForTesting
+  protected boolean isMarkedAsFaultyServer(String serverId) {
+    return faultyServerToReplacements.containsKey(serverId);
   }
 
-  public ShuffleServerInfo useExistingReassignmentForMultiPartitions(
-      Set<Integer> partitionIds, String faultyServerId) {
-    return createNewReassignmentForMultiPartitions(partitionIds, 
faultyServerId, null);
+  public Set<ShuffleServerInfo> getExistingReplacements(String faultyServerId) 
{
+    return faultyServerToReplacements.get(faultyServerId);
   }
 
-  public ShuffleServerInfo createNewReassignmentForMultiPartitions(
-      Set<Integer> partitionIds, String faultyServerId, ShuffleServerInfo 
replacement) {
-    if (replacement != null) {
-      faultyServerReplacements.put(faultyServerId, replacement);
+  public void updateReassignment(
+      Set<Integer> partitionIds, String faultyServerId, Set<ShuffleServerInfo> 
replacements) {
+    if (replacements == null) {
+      return;
     }
 
-    replacement = faultyServerReplacements.get(faultyServerId);
+    faultyServerToReplacements.put(faultyServerId, replacements);
+    // todo: optimize the multiple for performance
     for (Integer partitionId : partitionIds) {
-      List<ShuffleServerInfo> replicaServers = 
partitionToServers.get(partitionId);
-      for (int i = 0; i < replicaServers.size(); i++) {
-        if (replicaServers.get(i).getId().equals(faultyServerId)) {
-          Map<Integer, ShuffleServerInfo> replicaReplacements =
-              failoverPartitionServers.computeIfAbsent(
-                  partitionId, k -> JavaUtils.newConcurrentMap());
-          replicaReplacements.put(i, replacement);
+      Map<Integer, List<ShuffleServerInfo>> replicaServers =
+          partitionReplicaAssignedServers.get(partitionId);
+      for (Map.Entry<Integer, List<ShuffleServerInfo>> serverEntry : 
replicaServers.entrySet()) {
+        List<ShuffleServerInfo> servers = serverEntry.getValue();
+        if (servers.stream()
+            .map(x -> x.getId())
+            .collect(Collectors.toSet())
+            .contains(faultyServerId)) {
+          Set<ShuffleServerInfo> tempSet = new HashSet<>();
+          tempSet.addAll(replacements);
+          tempSet.removeAll(servers);
+          servers.addAll(tempSet);

Review Comment:
   Oh,  I think I need to say the reassign occurs on the writer side. Sorry I 
still don’t catch your thought. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to