zuston commented on code in PR #1610:
URL:
https://github.com/apache/incubator-uniffle/pull/1610#discussion_r1547412671
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java:
##########
@@ -59,6 +61,15 @@ public List<Runnable> getProcessedCallbackChain() {
return processedCallbackChain;
}
+ public void withBlockProcessedCallback(
Review Comment:
done
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -694,4 +745,14 @@ private void throwFetchFailedIfNecessary(Exception e) {
}
throw new RssException(e);
}
+
+ @VisibleForTesting
+ protected void enableBlockFailSentRetry() {
+ this.isBlockFailSentRetryEnabled = true;
+ }
+
+ @VisibleForTesting
+ protected void addReassignmentShuffleServer(String shuffleId,
ShuffleServerInfo replacement) {
+ failoverShuffleServers.put(shuffleId, replacement);
Review Comment:
done
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -422,45 +454,63 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
}
}
- private void checkIfBlocksFailed() {
- Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
- if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
- Set<TrackingBlockStatus> shouldResendBlockSet =
shouldResendBlockStatusSet(failedBlockIds);
- try {
- reSendFailedBlockIds(shouldResendBlockSet);
- } catch (Exception e) {
- LOG.error("resend failed blocks failed.", e);
+ private void checkDataIfAnyFailure() {
+ if (isBlockFailSentRetryEnabled) {
+ collectFailedBlocksToResend();
+ } else {
+ if (hasAnyBlockFailure()) {
+ throw new RssSendFailedException("Fail to send the block");
}
- failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
}
+ }
+
+ private boolean hasAnyBlockFailure() {
+ Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
- String errorMsg =
- "Send failed: Task["
- + taskId
- + "]"
- + " failed because "
- + failedBlockIds.size()
- + " blocks can't be sent to shuffle server: "
- +
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers();
- LOG.error(errorMsg);
- throw new RssSendFailedException(errorMsg);
+ LOG.error(
+ "Errors on sending blocks for task[{}]. {} blocks can't be sent to
remote servers: {}",
+ taskId,
+ failedBlockIds.size(),
+
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
+ return true;
}
+ return false;
}
- private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long>
failedBlockIds) {
- FailedBlockSendTracker failedBlockTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
- Set<TrackingBlockStatus> resendBlockStatusSet = Sets.newHashSet();
- for (Long failedBlockId : failedBlockIds) {
- failedBlockTracker.getFailedBlockStatus(failedBlockId).stream()
- // todo: more status need reassign
- .filter(
- trackingBlockStatus -> trackingBlockStatus.getStatusCode() ==
StatusCode.NO_BUFFER)
- .forEach(trackingBlockStatus ->
resendBlockStatusSet.add(trackingBlockStatus));
+ private void collectFailedBlocksToResend() {
+ if (!isBlockFailSentRetryEnabled) {
+ return;
}
- return resendBlockStatusSet;
+
+ FailedBlockSendTracker failedTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
+ if (failedBlockIds == null || failedBlockIds.isEmpty()) {
Review Comment:
done
##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java:
##########
@@ -363,6 +367,34 @@ protected List<CompletableFuture<Long>> postBlockEvent(
List<ShuffleBlockInfo> shuffleBlockInfoList) {
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (AddBlockEvent event :
bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
+ if (isBlockFailSentRetryEnabled) {
+ FailedBlockSendTracker tracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
+ event.withBlockProcessedCallback(
+ (block, isSuccessful) -> {
+ if (isSuccessful) {
+ bufferManager.freeAllocatedMemory(block.getFreeMemory());
+ block.getData().release();
+ return;
+ }
+
+ // for the exceeding max send retry blocks, we should release
its data and allocated
+ // memory.
+ List<TrackingBlockStatus> record =
tracker.getFailedBlockStatus(block.getBlockId());
+ if (CollectionUtils.isEmpty(record)) {
+ LOG.error(
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]