zuston commented on code in PR #1610:
URL: 
https://github.com/apache/incubator-uniffle/pull/1610#discussion_r1547110824


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -419,89 +434,108 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
     }
   }
 
-  private void checkIfBlocksFailed() {
-    Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
-    if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
-      Set<TrackingBlockStatus> shouldResendBlockSet = 
shouldResendBlockStatusSet(failedBlockIds);
-      try {
-        reSendFailedBlockIds(shouldResendBlockSet);
-      } catch (Exception e) {
-        LOG.error("resend failed blocks failed.", e);
+  private void checkDataIfAnyFailure() {
+    if (isBlockFailSentRetryEnabled) {
+      collectFailedBlocksToResend();
+    } else {
+      if (hasAnyBlockFailure()) {
+        throw new RssSendFailedException("Send fail");
       }
-      failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
     }
+  }
+
+  private boolean hasAnyBlockFailure() {
+    Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
     if (!failedBlockIds.isEmpty()) {
-      String errorMsg =
-          "Send failed: Task["
-              + taskId
-              + "]"
-              + " failed because "
-              + failedBlockIds.size()
-              + " blocks can't be sent to shuffle server: "
-              + 
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
-      LOG.error(errorMsg);
-      throw new RssSendFailedException(errorMsg);
+      LOG.error(
+          "Errors on sending blocks for task[{}]. {} blocks can't be sent to 
remote servers: {}",
+          taskId,
+          failedBlockIds.size(),
+          
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
+      return true;
     }
+    return false;
   }
 
-  private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long> 
failedBlockIds) {
-    FailedBlockSendTracker failedBlockTracker = 
shuffleManager.getBlockIdsFailedSendTracker(taskId);
-    Set<TrackingBlockStatus> resendBlockStatusSet = Sets.newHashSet();
-    for (Long failedBlockId : failedBlockIds) {
-      failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
-          // todo: more status need reassign
-          .filter(
-              trackingBlockStatus -> trackingBlockStatus.getStatusCode() == 
StatusCode.NO_BUFFER)
-          .forEach(trackingBlockStatus -> 
resendBlockStatusSet.add(trackingBlockStatus));
+  private void collectFailedBlocksToResend() {
+    if (!isBlockFailSentRetryEnabled) {
+      return;
+    }
+
+    FailedBlockSendTracker failedTracker = 
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+    Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
+    if (failedBlockIds == null || failedBlockIds.isEmpty()) {
+      return;
     }
-    return resendBlockStatusSet;
+
+    Set<TrackingBlockStatus> resendCandidates = new HashSet<>();
+    // to check whether the blocks resent exceed the max resend count.
+    for (Long blockId : failedBlockIds) {
+      List<TrackingBlockStatus> retryRecords = 
failedTracker.getFailedBlockStatus(blockId);
+      // todo: support retry times by config
+      if (retryRecords.size() >= blockFailSentMaxTimes) {
+        LOG.error(
+            "Partial blocks for taskId: [{}] retry exceeding the max retry 
times. Fast fail! faulty server list: {}",
+            taskId,
+            retryRecords.stream().map(x -> 
x.getShuffleServerInfo()).collect(Collectors.toSet()));
+        // fast fail if any blocks failure with multiple retry times
+        throw new RssSendFailedException(
+            "Errors on resending the blocks data to the remote 
shuffle-server.");
+      }
+
+      // todo: if setting multi replica and another replica is succeed to 
send, no need to resend
+      resendCandidates.add(retryRecords.get(retryRecords.size() - 1));
+    }
+
+    resendFailedBlocks(resendCandidates);
   }
 
-  private void reSendFailedBlockIds(Set<TrackingBlockStatus> 
failedBlockStatusSet) {
+  private void resendFailedBlocks(Set<TrackingBlockStatus> 
failedBlockStatusSet) {
     List<ShuffleBlockInfo> reAssignSeverBlockInfoList = Lists.newArrayList();
     List<ShuffleBlockInfo> failedBlockInfoList = Lists.newArrayList();
     Map<ShuffleServerInfo, List<TrackingBlockStatus>> faultyServerToPartitions 
=
         failedBlockStatusSet.stream().collect(Collectors.groupingBy(d -> 
d.getShuffleServerInfo()));
-    Map<String, ShuffleServerInfo> faultyServers = 
shuffleManager.getReassignedFaultyServers();
     faultyServerToPartitions.entrySet().stream()
         .forEach(
             t -> {
               Set<String> partitionIds =
                   t.getValue().stream()
                       .map(x -> 
String.valueOf(x.getShuffleBlockInfo().getPartitionId()))
                       .collect(Collectors.toSet());
-              ShuffleServerInfo dynamicShuffleServer = 
faultyServers.get(t.getKey().getId());
+              ShuffleServerInfo dynamicShuffleServer =
+                  failoverShuffleServers.get(t.getKey().getId());
               if (dynamicShuffleServer == null) {
+                // todo: merge multiple requests into one.
                 dynamicShuffleServer =
                     reAssignFaultyShuffleServer(partitionIds, 
t.getKey().getId());
-                faultyServers.put(t.getKey().getId(), dynamicShuffleServer);
+                failoverShuffleServers.put(t.getKey().getId(), 
dynamicShuffleServer);
               }
 
               ShuffleServerInfo finalDynamicShuffleServer = 
dynamicShuffleServer;
-              failedBlockStatusSet.forEach(
-                  trackingBlockStatus -> {
-                    ShuffleBlockInfo failedBlockInfo = 
trackingBlockStatus.getShuffleBlockInfo();
-                    failedBlockInfoList.add(failedBlockInfo);
-                    reAssignSeverBlockInfoList.add(
-                        new ShuffleBlockInfo(
-                            failedBlockInfo.getShuffleId(),
-                            failedBlockInfo.getPartitionId(),
-                            failedBlockInfo.getBlockId(),
-                            failedBlockInfo.getLength(),
-                            failedBlockInfo.getCrc(),
-                            failedBlockInfo.getData(),
-                            Lists.newArrayList(finalDynamicShuffleServer),
-                            failedBlockInfo.getUncompressLength(),
-                            failedBlockInfo.getFreeMemory(),
-                            taskAttemptId));
-                  });
+              for (TrackingBlockStatus blockStatus : failedBlockStatusSet) {
+                ShuffleBlockInfo failedBlockInfo = 
blockStatus.getShuffleBlockInfo();
+                failedBlockInfoList.add(failedBlockInfo);
+                ShuffleBlockInfo newBlock =

Review Comment:
   Make sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to