This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d9ed675c8 [#2165] fix(client): Fix the exception caused by repeated
unregister shuffle in tez tasks. (#2166)
d9ed675c8 is described below
commit d9ed675c8c7e7013059fb4fe98a95489b91629eb
Author: zhengchenyu <[email protected]>
AuthorDate: Sat Oct 12 11:48:55 2024 +0800
[#2165] fix(client): Fix the exception caused by repeated unregister
shuffle in tez tasks. (#2166)
### What changes were proposed in this pull request?
Do not unregister repeatedly.
### Why are the changes needed?
Fix: #2165
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
integration test
---
.../client/impl/ShuffleWriteClientImpl.java | 79 +++++++++++-----------
1 file changed, 41 insertions(+), 38 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 6f2860c11..3b7e9ba12 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -1033,46 +1033,49 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
ExecutorService executorService = null;
try {
int concurrency = Math.min(unregisterThreadPoolSize,
shuffleServerInfos.size());
- executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency,
"unregister-shuffle");
+ if (concurrency > 0) {
+ executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency,
"unregister-shuffle");
- ThreadUtils.executeTasks(
- executorService,
- shuffleServerInfos,
- shuffleServerInfo -> {
- try {
- ShuffleServerClient client =
- ShuffleServerClientFactory.getInstance()
- .getShuffleServerClient(clientType, shuffleServerInfo,
rssConf);
- RssUnregisterShuffleByAppIdResponse response =
- client.unregisterShuffleByAppId(request);
- if (response.getStatusCode() == StatusCode.SUCCESS) {
- LOG.info("Successfully unregistered shuffle from {}",
shuffleServerInfo);
- } else {
- LOG.warn("Failed to unregister shuffle from {}",
shuffleServerInfo);
- }
- } catch (Exception e) {
- // this request observed the unregisterRequestTimeSec timeout
- if (e instanceof StatusRuntimeException
- && ((StatusRuntimeException) e).getStatus().getCode()
- == Status.DEADLINE_EXCEEDED.getCode()) {
- LOG.warn(
- "Timeout occurred while unregistering from {}. The request
timeout is {}s: {}",
- shuffleServerInfo,
- unregisterRequestTimeSec,
- ((StatusRuntimeException) e).getStatus().getDescription());
- } else {
- LOG.warn("Error while unregistering from {}",
shuffleServerInfo, e);
+ ThreadUtils.executeTasks(
+ executorService,
+ shuffleServerInfos,
+ shuffleServerInfo -> {
+ try {
+ ShuffleServerClient client =
+ ShuffleServerClientFactory.getInstance()
+ .getShuffleServerClient(clientType, shuffleServerInfo,
rssConf);
+ RssUnregisterShuffleByAppIdResponse response =
+ client.unregisterShuffleByAppId(request);
+ if (response.getStatusCode() == StatusCode.SUCCESS) {
+ LOG.info("Successfully unregistered shuffle from {}",
shuffleServerInfo);
+ } else {
+ LOG.warn("Failed to unregister shuffle from {}",
shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ // this request observed the unregisterRequestTimeSec timeout
+ if (e instanceof StatusRuntimeException
+ && ((StatusRuntimeException) e).getStatus().getCode()
+ == Status.DEADLINE_EXCEEDED.getCode()) {
+ LOG.warn(
+ "Timeout occurred while unregistering from {}. The
request timeout is {}s: {}",
+ shuffleServerInfo,
+ unregisterRequestTimeSec,
+ ((StatusRuntimeException)
e).getStatus().getDescription());
+ } else {
+ LOG.warn("Error while unregistering from {}",
shuffleServerInfo, e);
+ }
}
- }
- return null;
- },
- unregisterTimeMs,
- "unregister shuffle server",
- String.format(
- "Please consider increasing the thread pool size (%s) or the
overall timeout (%ss) "
- + "if you still think the request timeout (%ss) is
sensible.",
- unregisterThreadPoolSize, unregisterTimeSec,
unregisterRequestTimeSec));
-
+ return null;
+ },
+ unregisterTimeMs,
+ "unregister shuffle server",
+ String.format(
+ "Please consider increasing the thread pool size (%s) or the
overall timeout (%ss) "
+ + "if you still think the request timeout (%ss) is
sensible.",
+ unregisterThreadPoolSize, unregisterTimeSec,
unregisterRequestTimeSec));
+ } else {
+ LOG.info("No need to unregister shuffle.");
+ }
} finally {
if (executorService != null) {
executorService.shutdownNow();