jerqi commented on code in PR #1610:
URL:
https://github.com/apache/incubator-uniffle/pull/1610#discussion_r1546232750
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -419,89 +434,108 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
}
}
- private void checkIfBlocksFailed() {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
- if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
- Set<TrackingBlockStatus> shouldResendBlockSet =
shouldResendBlockStatusSet(failedBlockIds);
- try {
- reSendFailedBlockIds(shouldResendBlockSet);
- } catch (Exception e) {
- LOG.error("resend failed blocks failed.", e);
+ private void checkDataIfAnyFailure() {
+ if (isBlockFailSentRetryEnabled) {
+ collectFailedBlocksToResend();
+ } else {
+ if (hasAnyBlockFailure()) {
+ throw new RssSendFailedException("Send fail");
}
- failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
}
+ }
+
+ private boolean hasAnyBlockFailure() {
+ Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
- String errorMsg =
- "Send failed: Task["
- + taskId
- + "]"
- + " failed because "
- + failedBlockIds.size()
- + " blocks can't be sent to shuffle server: "
- +
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
- LOG.error(errorMsg);
- throw new RssSendFailedException(errorMsg);
+ LOG.error(
+ "Errors on sending blocks for task[{}]. {} blocks can't be sent to
remote servers: {}",
+ taskId,
+ failedBlockIds.size(),
+
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
+ return true;
}
+ return false;
}
- private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long>
failedBlockIds) {
- FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
- Set<TrackingBlockStatus> resendBlockStatusSet = Sets.newHashSet();
- for (Long failedBlockId : failedBlockIds) {
- failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
- // todo: more status need reassign
- .filter(
- trackingBlockStatus -> trackingBlockStatus.getStatusCode() ==
StatusCode.NO_BUFFER)
- .forEach(trackingBlockStatus ->
resendBlockStatusSet.add(trackingBlockStatus));
+ private void collectFailedBlocksToResend() {
+ if (!isBlockFailSentRetryEnabled) {
+ return;
+ }
+
+ FailedBlockSendTracker failedTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
+ if (failedBlockIds == null || failedBlockIds.isEmpty()) {
+ return;
}
- return resendBlockStatusSet;
+
+ Set<TrackingBlockStatus> resendCandidates = new HashSet<>();
+ // to check whether the blocks resent exceed the max resend count.
+ for (Long blockId : failedBlockIds) {
+ List<TrackingBlockStatus> retryRecords =
failedTracker.getFailedBlockStatus(blockId);
+ // todo: support retry times by config
+ if (retryRecords.size() >= blockFailSentMaxTimes) {
+ LOG.error(
+ "Partial blocks for taskId: [{}] retry exceeding the max retry
times. Fast fail! faulty server list: {}",
+ taskId,
+ retryRecords.stream().map(x ->
x.getShuffleServerInfo()).collect(Collectors.toSet()));
+ // fast fail if any blocks failure with multiple retry times
+ throw new RssSendFailedException(
+ "Errors on resending the blocks data to the remote
shuffle-server.");
+ }
+
+ // todo: if setting multi replica and another replica is succeed to
send, no need to resend
+ resendCandidates.add(retryRecords.get(retryRecords.size() - 1));
+ }
+
+ resendFailedBlocks(resendCandidates);
}
- private void reSendFailedBlockIds(Set<TrackingBlockStatus>
failedBlockStatusSet) {
+ private void resendFailedBlocks(Set<TrackingBlockStatus>
failedBlockStatusSet) {
List<ShuffleBlockInfo> reAssignSeverBlockInfoList = Lists.newArrayList();
List<ShuffleBlockInfo> failedBlockInfoList = Lists.newArrayList();
Map<ShuffleServerInfo, List<TrackingBlockStatus>> faultyServerToPartitions
=
failedBlockStatusSet.stream().collect(Collectors.groupingBy(d ->
d.getShuffleServerInfo()));
- Map<String, ShuffleServerInfo> faultyServers =
shuffleManager.getReassignedFaultyServers();
faultyServerToPartitions.entrySet().stream()
.forEach(
t -> {
Set<String> partitionIds =
t.getValue().stream()
.map(x ->
String.valueOf(x.getShuffleBlockInfo().getPartitionId()))
.collect(Collectors.toSet());
- ShuffleServerInfo dynamicShuffleServer =
faultyServers.get(t.getKey().getId());
+ ShuffleServerInfo dynamicShuffleServer =
+ failoverShuffleServers.get(t.getKey().getId());
if (dynamicShuffleServer == null) {
+ // todo: merge multiple requests into one.
dynamicShuffleServer =
reAssignFaultyShuffleServer(partitionIds,
t.getKey().getId());
- faultyServers.put(t.getKey().getId(), dynamicShuffleServer);
+ failoverShuffleServers.put(t.getKey().getId(),
dynamicShuffleServer);
}
ShuffleServerInfo finalDynamicShuffleServer =
dynamicShuffleServer;
- failedBlockStatusSet.forEach(
- trackingBlockStatus -> {
- ShuffleBlockInfo failedBlockInfo =
trackingBlockStatus.getShuffleBlockInfo();
- failedBlockInfoList.add(failedBlockInfo);
- reAssignSeverBlockInfoList.add(
- new ShuffleBlockInfo(
- failedBlockInfo.getShuffleId(),
- failedBlockInfo.getPartitionId(),
- failedBlockInfo.getBlockId(),
- failedBlockInfo.getLength(),
- failedBlockInfo.getCrc(),
- failedBlockInfo.getData(),
- Lists.newArrayList(finalDynamicShuffleServer),
- failedBlockInfo.getUncompressLength(),
- failedBlockInfo.getFreeMemory(),
- taskAttemptId));
- });
+ for (TrackingBlockStatus blockStatus : failedBlockStatusSet) {
+ ShuffleBlockInfo failedBlockInfo =
blockStatus.getShuffleBlockInfo();
+ failedBlockInfoList.add(failedBlockInfo);
+ ShuffleBlockInfo newBlock =
Review Comment:
Why do we use a new block instead of reusing old block?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]