jerqi commented on code in PR #1610:
URL:
https://github.com/apache/incubator-uniffle/pull/1610#discussion_r1547112219
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -422,45 +454,63 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
}
}
- private void checkIfBlocksFailed() {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
- if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
- Set<TrackingBlockStatus> shouldResendBlockSet =
shouldResendBlockStatusSet(failedBlockIds);
- try {
- reSendFailedBlockIds(shouldResendBlockSet);
- } catch (Exception e) {
- LOG.error("resend failed blocks failed.", e);
+ private void checkDataIfAnyFailure() {
+ if (isBlockFailSentRetryEnabled) {
+ collectFailedBlocksToResend();
+ } else {
+ if (hasAnyBlockFailure()) {
+ throw new RssSendFailedException("Fail to send the block");
}
- failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
}
+ }
+
+ private boolean hasAnyBlockFailure() {
+ Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
- String errorMsg =
- "Send failed: Task["
- + taskId
- + "]"
- + " failed because "
- + failedBlockIds.size()
- + " blocks can't be sent to shuffle server: "
- +
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
- LOG.error(errorMsg);
- throw new RssSendFailedException(errorMsg);
+ LOG.error(
+ "Errors on sending blocks for task[{}]. {} blocks can't be sent to
remote servers: {}",
+ taskId,
+ failedBlockIds.size(),
+
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
+ return true;
}
+ return false;
}
- private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long>
failedBlockIds) {
- FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
- Set<TrackingBlockStatus> resendBlockStatusSet = Sets.newHashSet();
- for (Long failedBlockId : failedBlockIds) {
- failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
- // todo: more status need reassign
- .filter(
- trackingBlockStatus -> trackingBlockStatus.getStatusCode() ==
StatusCode.NO_BUFFER)
- .forEach(trackingBlockStatus ->
resendBlockStatusSet.add(trackingBlockStatus));
+ private void collectFailedBlocksToResend() {
+ if (!isBlockFailSentRetryEnabled) {
+ return;
}
- return resendBlockStatusSet;
+
+ FailedBlockSendTracker failedTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
+ if (failedBlockIds == null || failedBlockIds.isEmpty()) {
Review Comment:
Could you use CollectionUtils here, too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]