This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 774d64edc [MINOR] feat(spark): Support reporting more error messages
from client when failing to send blocks (#1914)
774d64edc is described below
commit 774d64edc42214f8b34ccbd6b321769c6e9c3035
Author: maobaolong <[email protected]>
AuthorDate: Thu Jul 18 02:50:19 2024 +0800
[MINOR] feat(spark): Support reporting more error messages from client when
failing to send blocks (#1914)
### What changes were proposed in this pull request?
Support reporting more error messages rather than only `Fail to send the
block`.
### Why are the changes needed?
Let users know the error code so users can solve issues by themselves.
### Does this PR introduce _any_ user-facing change?
No, just output more error messages.
### How was this patch tested?
In our env.
---
.../spark/shuffle/writer/RssShuffleWriter.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 6660a5e7b..870141c4b 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -101,6 +101,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(RssShuffleWriter.class);
private static final String DUMMY_HOST = "dummy_host";
private static final int DUMMY_PORT = 99999;
+ public static final String DEFAULT_ERROR_MESSAGE = "Default Error Message";
private final String appId;
private final int shuffleId;
@@ -513,23 +514,32 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (blockFailSentRetryEnabled) {
collectFailedBlocksToResend();
} else {
- if (hasAnyBlockFailure()) {
- throw new RssSendFailedException("Fail to send the block");
+ String errorMsg = getFirstBlockFailure();
+ if (errorMsg != null) {
+ throw new RssSendFailedException("Fail to send the block. Error: " +
errorMsg);
}
}
}
- private boolean hasAnyBlockFailure() {
+ private String getFirstBlockFailure() {
Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
+ List<TrackingBlockStatus> trackingBlockStatues =
+ shuffleManager
+ .getBlockIdsFailedSendTracker(taskId)
+ .getFailedBlockStatus(failedBlockIds.iterator().next());
+ String errorMsg = DEFAULT_ERROR_MESSAGE;
+ if (CollectionUtils.isNotEmpty(trackingBlockStatues)) {
+ errorMsg = trackingBlockStatues.get(0).getStatusCode().name();
+ }
LOG.error(
"Errors on sending blocks for task[{}]. {} blocks can't be sent to
remote servers: {}",
taskId,
failedBlockIds.size(),
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
- return true;
+ return errorMsg;
}
- return false;
+ return null;
}
private void collectFailedBlocksToResend() {