zuston commented on code in PR #1615:
URL: 
https://github.com/apache/incubator-uniffle/pull/1615#discussion_r1568117273


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java:
##########
@@ -85,33 +126,125 @@ public int getShuffleId() {
     return shuffleId;
   }
 
-  public boolean isExistingFaultyServer(String serverId) {
-    return faultyServerReplacements.containsKey(serverId);
+  @VisibleForTesting
+  protected boolean isMarkedAsFaultyServer(String serverId) {
+    return faultyServerToReplacements.containsKey(serverId);
   }
 
-  public ShuffleServerInfo useExistingReassignmentForMultiPartitions(
-      Set<Integer> partitionIds, String faultyServerId) {
-    return createNewReassignmentForMultiPartitions(partitionIds, 
faultyServerId, null);
+  public Set<ShuffleServerInfo> getExistingReplacements(String faultyServerId) 
{
+    return faultyServerToReplacements.get(faultyServerId);
   }
 
-  public ShuffleServerInfo createNewReassignmentForMultiPartitions(
-      Set<Integer> partitionIds, String faultyServerId, ShuffleServerInfo 
replacement) {
-    if (replacement != null) {
-      faultyServerReplacements.put(faultyServerId, replacement);
+  public void updateReassignment(
+      Set<Integer> partitionIds, String faultyServerId, Set<ShuffleServerInfo> 
replacements) {
+    if (replacements == null) {
+      return;
     }
 
-    replacement = faultyServerReplacements.get(faultyServerId);
+    faultyServerToReplacements.put(faultyServerId, replacements);
+    // todo: optimize the multiple for performance
     for (Integer partitionId : partitionIds) {
-      List<ShuffleServerInfo> replicaServers = 
partitionToServers.get(partitionId);
-      for (int i = 0; i < replicaServers.size(); i++) {
-        if (replicaServers.get(i).getId().equals(faultyServerId)) {
-          Map<Integer, ShuffleServerInfo> replicaReplacements =
-              failoverPartitionServers.computeIfAbsent(
-                  partitionId, k -> JavaUtils.newConcurrentMap());
-          replicaReplacements.put(i, replacement);
+      Map<Integer, List<ShuffleServerInfo>> replicaServers =
+          partitionReplicaAssignedServers.get(partitionId);
+      for (Map.Entry<Integer, List<ShuffleServerInfo>> serverEntry : 
replicaServers.entrySet()) {
+        List<ShuffleServerInfo> servers = serverEntry.getValue();
+        if (servers.stream()
+            .map(x -> x.getId())
+            .collect(Collectors.toSet())
+            .contains(faultyServerId)) {
+          Set<ShuffleServerInfo> tempSet = new HashSet<>();
+          tempSet.addAll(replacements);
+          tempSet.removeAll(servers);
+          servers.addAll(tempSet);

Review Comment:
   Yes.
   
   The name of `faulty servers` here is not accurate, (But here I will not 
change this, maybe later.) because due to tight memory, the NO_BUFFER will be 
threw. For these cases, we'd better to reassign partition server to write data.
   
   So the reserving these server is to fetch partial data when reading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to