This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d9ed675c8 [#2165] fix(client): Fix the exception caused by repeated 
unregister shuffle in tez tasks. (#2166)
d9ed675c8 is described below

commit d9ed675c8c7e7013059fb4fe98a95489b91629eb
Author: zhengchenyu <[email protected]>
AuthorDate: Sat Oct 12 11:48:55 2024 +0800

    [#2165] fix(client): Fix the exception caused by repeated unregister 
shuffle in tez tasks. (#2166)
    
    ### What changes were proposed in this pull request?
    
    Do not unregister repeatedly.
    
    ### Why are the changes needed?
    
    Fix: #2165
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    integration test
---
 .../client/impl/ShuffleWriteClientImpl.java        | 79 +++++++++++-----------
 1 file changed, 41 insertions(+), 38 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 6f2860c11..3b7e9ba12 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -1033,46 +1033,49 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     ExecutorService executorService = null;
     try {
       int concurrency = Math.min(unregisterThreadPoolSize, 
shuffleServerInfos.size());
-      executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency, 
"unregister-shuffle");
+      if (concurrency > 0) {
+        executorService = ThreadUtils.getDaemonFixedThreadPool(concurrency, 
"unregister-shuffle");
 
-      ThreadUtils.executeTasks(
-          executorService,
-          shuffleServerInfos,
-          shuffleServerInfo -> {
-            try {
-              ShuffleServerClient client =
-                  ShuffleServerClientFactory.getInstance()
-                      .getShuffleServerClient(clientType, shuffleServerInfo, 
rssConf);
-              RssUnregisterShuffleByAppIdResponse response =
-                  client.unregisterShuffleByAppId(request);
-              if (response.getStatusCode() == StatusCode.SUCCESS) {
-                LOG.info("Successfully unregistered shuffle from {}", 
shuffleServerInfo);
-              } else {
-                LOG.warn("Failed to unregister shuffle from {}", 
shuffleServerInfo);
-              }
-            } catch (Exception e) {
-              // this request observed the unregisterRequestTimeSec timeout
-              if (e instanceof StatusRuntimeException
-                  && ((StatusRuntimeException) e).getStatus().getCode()
-                      == Status.DEADLINE_EXCEEDED.getCode()) {
-                LOG.warn(
-                    "Timeout occurred while unregistering from {}. The request 
timeout is {}s: {}",
-                    shuffleServerInfo,
-                    unregisterRequestTimeSec,
-                    ((StatusRuntimeException) e).getStatus().getDescription());
-              } else {
-                LOG.warn("Error while unregistering from {}", 
shuffleServerInfo, e);
+        ThreadUtils.executeTasks(
+            executorService,
+            shuffleServerInfos,
+            shuffleServerInfo -> {
+              try {
+                ShuffleServerClient client =
+                    ShuffleServerClientFactory.getInstance()
+                        .getShuffleServerClient(clientType, shuffleServerInfo, 
rssConf);
+                RssUnregisterShuffleByAppIdResponse response =
+                    client.unregisterShuffleByAppId(request);
+                if (response.getStatusCode() == StatusCode.SUCCESS) {
+                  LOG.info("Successfully unregistered shuffle from {}", 
shuffleServerInfo);
+                } else {
+                  LOG.warn("Failed to unregister shuffle from {}", 
shuffleServerInfo);
+                }
+              } catch (Exception e) {
+                // this request observed the unregisterRequestTimeSec timeout
+                if (e instanceof StatusRuntimeException
+                    && ((StatusRuntimeException) e).getStatus().getCode()
+                        == Status.DEADLINE_EXCEEDED.getCode()) {
+                  LOG.warn(
+                      "Timeout occurred while unregistering from {}. The 
request timeout is {}s: {}",
+                      shuffleServerInfo,
+                      unregisterRequestTimeSec,
+                      ((StatusRuntimeException) 
e).getStatus().getDescription());
+                } else {
+                  LOG.warn("Error while unregistering from {}", 
shuffleServerInfo, e);
+                }
               }
-            }
-            return null;
-          },
-          unregisterTimeMs,
-          "unregister shuffle server",
-          String.format(
-              "Please consider increasing the thread pool size (%s) or the 
overall timeout (%ss) "
-                  + "if you still think the request timeout (%ss) is 
sensible.",
-              unregisterThreadPoolSize, unregisterTimeSec, 
unregisterRequestTimeSec));
-
+              return null;
+            },
+            unregisterTimeMs,
+            "unregister shuffle server",
+            String.format(
+                "Please consider increasing the thread pool size (%s) or the 
overall timeout (%ss) "
+                    + "if you still think the request timeout (%ss) is 
sensible.",
+                unregisterThreadPoolSize, unregisterTimeSec, 
unregisterRequestTimeSec));
+      } else {
+        LOG.info("No need to unregister shuffle.");
+      }
     } finally {
       if (executorService != null) {
         executorService.shutdownNow();

Reply via email to