jerqi commented on code in PR #1610:
URL:
https://github.com/apache/incubator-uniffle/pull/1610#discussion_r1546235548
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -419,89 +434,108 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
}
}
- private void checkIfBlocksFailed() {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
- if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
- Set<TrackingBlockStatus> shouldResendBlockSet =
shouldResendBlockStatusSet(failedBlockIds);
- try {
- reSendFailedBlockIds(shouldResendBlockSet);
- } catch (Exception e) {
- LOG.error("resend failed blocks failed.", e);
+ private void checkDataIfAnyFailure() {
+ if (isBlockFailSentRetryEnabled) {
+ collectFailedBlocksToResend();
+ } else {
+ if (hasAnyBlockFailure()) {
+ throw new RssSendFailedException("Send fail");
}
- failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
}
+ }
+
+ private boolean hasAnyBlockFailure() {
+ Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
- String errorMsg =
- "Send failed: Task["
- + taskId
- + "]"
- + " failed because "
- + failedBlockIds.size()
- + " blocks can't be sent to shuffle server: "
- +
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
- LOG.error(errorMsg);
- throw new RssSendFailedException(errorMsg);
+ LOG.error(
+ "Errors on sending blocks for task[{}]. {} blocks can't be sent to
remote servers: {}",
+ taskId,
+ failedBlockIds.size(),
+
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
+ return true;
}
+ return false;
}
- private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long>
failedBlockIds) {
- FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
- Set<TrackingBlockStatus> resendBlockStatusSet = Sets.newHashSet();
- for (Long failedBlockId : failedBlockIds) {
- failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
- // todo: more status need reassign
- .filter(
- trackingBlockStatus -> trackingBlockStatus.getStatusCode() ==
StatusCode.NO_BUFFER)
- .forEach(trackingBlockStatus ->
resendBlockStatusSet.add(trackingBlockStatus));
+ private void collectFailedBlocksToResend() {
+ if (!isBlockFailSentRetryEnabled) {
+ return;
+ }
+
+ FailedBlockSendTracker failedTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
+ if (failedBlockIds == null || failedBlockIds.isEmpty()) {
+ return;
}
- return resendBlockStatusSet;
+
+ Set<TrackingBlockStatus> resendCandidates = new HashSet<>();
+ // to check whether the blocks resent exceed the max resend count.
+ for (Long blockId : failedBlockIds) {
+ List<TrackingBlockStatus> retryRecords =
failedTracker.getFailedBlockStatus(blockId);
+ // todo: support retry times by config
+ if (retryRecords.size() >= blockFailSentMaxTimes) {
Review Comment:
Do we need retryCounter if we have retryRecords?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]