This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8d4906311 [#1743] fix: Add exception handling for thread pools (#1744)
8d4906311 is described below
commit 8d49063115ee49b18c366e8cf4c4b65de9782b45
Author: RickyMa <[email protected]>
AuthorDate: Thu May 30 14:50:07 2024 +0800
[#1743] fix: Add exception handling for thread pools (#1744)
### What changes were proposed in this pull request?
Add exception handling for thread pools.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1743.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../apache/spark/shuffle/writer/DataPusher.java | 67 ++++++-----
.../client/impl/ShuffleWriteClientImpl.java | 127 +++++++++++----------
2 files changed, 103 insertions(+), 91 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index 1517b7173..e9ef2ba61 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -85,38 +85,43 @@ public class DataPusher implements Closeable {
throw new RssException("RssAppId should be set.");
}
return CompletableFuture.supplyAsync(
- () -> {
- String taskId = event.getTaskId();
- List<ShuffleBlockInfo> shuffleBlockInfoList =
event.getShuffleDataInfoList();
- SendShuffleDataResult result = null;
- try {
- result =
- shuffleWriteClient.sendShuffleData(
- rssAppId, shuffleBlockInfoList, () ->
!isValidTask(taskId));
- putBlockId(taskToSuccessBlockIds, taskId,
result.getSuccessBlockIds());
- putFailedBlockSendTracker(
- taskToFailedBlockSendTracker, taskId,
result.getFailedBlockSendTracker());
- } finally {
- Set<Long> succeedBlockIds =
- result.getSuccessBlockIds() == null
- ? Collections.emptySet()
- : result.getSuccessBlockIds();
- for (ShuffleBlockInfo block : shuffleBlockInfoList) {
-
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
- }
+ () -> {
+ String taskId = event.getTaskId();
+ List<ShuffleBlockInfo> shuffleBlockInfoList =
event.getShuffleDataInfoList();
+ SendShuffleDataResult result = null;
+ try {
+ result =
+ shuffleWriteClient.sendShuffleData(
+ rssAppId, shuffleBlockInfoList, () ->
!isValidTask(taskId));
+ putBlockId(taskToSuccessBlockIds, taskId,
result.getSuccessBlockIds());
+ putFailedBlockSendTracker(
+ taskToFailedBlockSendTracker, taskId,
result.getFailedBlockSendTracker());
+ } finally {
+ Set<Long> succeedBlockIds =
+ result.getSuccessBlockIds() == null
+ ? Collections.emptySet()
+ : result.getSuccessBlockIds();
+ for (ShuffleBlockInfo block : shuffleBlockInfoList) {
+
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
+ }
- List<Runnable> callbackChain =
-
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
- for (Runnable runnable : callbackChain) {
- runnable.run();
- }
- }
- return shuffleBlockInfoList.stream()
- .map(x -> x.getFreeMemory())
- .reduce((a, b) -> a + b)
- .get();
- },
- executorService);
+ List<Runnable> callbackChain =
+
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
+ for (Runnable runnable : callbackChain) {
+ runnable.run();
+ }
+ }
+ return shuffleBlockInfoList.stream()
+ .map(x -> x.getFreeMemory())
+ .reduce((a, b) -> a + b)
+ .get();
+ },
+ executorService)
+ .exceptionally(
+ ex -> {
+ LOGGER.error("Unexpected exceptions occurred while sending
shuffle data", ex);
+ return null;
+ });
}
private synchronized void putBlockId(
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index b9e523bb8..ed240c887 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -173,69 +173,76 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
serverToBlocks.entrySet()) {
CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(
- () -> {
- if (needCancelRequest.get()) {
- LOG.info("The upstream task has been failed. Abort this data
send.");
- return true;
- }
- ShuffleServerInfo ssi = entry.getKey();
- try {
- Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>
shuffleIdToBlocks =
- entry.getValue();
- // todo: compact unnecessary blocks that reach replicaWrite
- RssSendShuffleDataRequest request =
- new RssSendShuffleDataRequest(
- appId, retryMax, retryIntervalMax,
shuffleIdToBlocks);
- long s = System.currentTimeMillis();
- RssSendShuffleDataResponse response =
- getShuffleServerClient(ssi).sendShuffleData(request);
-
- String logMsg =
- String.format(
- "ShuffleWriteClientImpl sendShuffleData with %s
blocks to %s cost: %s(ms)",
- serverToBlockIds.get(ssi).size(),
- ssi.getId(),
- System.currentTimeMillis() - s);
-
- if (response.getStatusCode() == StatusCode.SUCCESS) {
- // mark a replica of block that has been sent
- serverToBlockIds
- .get(ssi)
- .forEach(
- blockId ->
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
- if (defectiveServers != null) {
- defectiveServers.remove(ssi);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} successfully.", logMsg);
+ () -> {
+ if (needCancelRequest.get()) {
+ LOG.info("The upstream task has been failed. Abort this
data send.");
+ return true;
}
- } else {
- recordFailedBlocks(
- failedBlockSendTracker, serverToBlocks, ssi,
response.getStatusCode());
- if (defectiveServers != null) {
- defectiveServers.add(ssi);
+ ShuffleServerInfo ssi = entry.getKey();
+ try {
+ Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>
shuffleIdToBlocks =
+ entry.getValue();
+ // todo: compact unnecessary blocks that reach
replicaWrite
+ RssSendShuffleDataRequest request =
+ new RssSendShuffleDataRequest(
+ appId, retryMax, retryIntervalMax,
shuffleIdToBlocks);
+ long s = System.currentTimeMillis();
+ RssSendShuffleDataResponse response =
+ getShuffleServerClient(ssi).sendShuffleData(request);
+
+ String logMsg =
+ String.format(
+ "ShuffleWriteClientImpl sendShuffleData with %s
blocks to %s cost: %s(ms)",
+ serverToBlockIds.get(ssi).size(),
+ ssi.getId(),
+ System.currentTimeMillis() - s);
+
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
+ // mark a replica of block that has been sent
+ serverToBlockIds
+ .get(ssi)
+ .forEach(
+ blockId ->
+
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
+ if (defectiveServers != null) {
+ defectiveServers.remove(ssi);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} successfully.", logMsg);
+ }
+ } else {
+ recordFailedBlocks(
+ failedBlockSendTracker, serverToBlocks, ssi,
response.getStatusCode());
+ if (defectiveServers != null) {
+ defectiveServers.add(ssi);
+ }
+ LOG.warn(
+ "{}, it failed wth statusCode[{}]", logMsg,
response.getStatusCode());
+ return false;
+ }
+ } catch (Exception e) {
+ recordFailedBlocks(
+ failedBlockSendTracker, serverToBlocks, ssi,
StatusCode.INTERNAL_ERROR);
+ if (defectiveServers != null) {
+ defectiveServers.add(ssi);
+ }
+ LOG.warn(
+ "Send: "
+ + serverToBlockIds.get(ssi).size()
+ + " blocks to ["
+ + ssi.getId()
+ + "] failed.",
+ e);
+ return false;
}
- LOG.warn("{}, it failed wth statusCode[{}]", logMsg,
response.getStatusCode());
+ return true;
+ },
+ dataTransferPool)
+ .exceptionally(
+ ex -> {
+ LOG.error("Unexpected exceptions occurred while sending
shuffle data", ex);
return false;
- }
- } catch (Exception e) {
- recordFailedBlocks(
- failedBlockSendTracker, serverToBlocks, ssi,
StatusCode.INTERNAL_ERROR);
- if (defectiveServers != null) {
- defectiveServers.add(ssi);
- }
- LOG.warn(
- "Send: "
- + serverToBlockIds.get(ssi).size()
- + " blocks to ["
- + ssi.getId()
- + "] failed.",
- e);
- return false;
- }
- return true;
- },
- dataTransferPool);
+ });
futures.add(future);
}