This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 1fed29a96 [CELEBORN-1008] Adjust push/fetch timeout checker thread 
pool and tasks
1fed29a96 is described below

commit 1fed29a96ad302104ac6a24d048512fa1ebcedca
Author: onebox-li <[email protected]>
AuthorDate: Thu Sep 28 10:43:08 2023 +0800

    [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
    
    ### What changes were proposed in this pull request?
    Only push/data module needs push-timeout-checker, and data module needs 
fetch-timeout-checker.
    Here make push-timeout-checker not to be created in 
Master/LifeCycleManager, and fetch-timeout-checker in Worker.
    The same goes for related timeout checker schedule tasks.
    
    ### Why are the changes needed?
    Ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Cluster test
    
    Closes #1940 from onebox-li/checker-dev.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit ef4fc51d5f2d9d1a707e06cac2c64f3a07082e42)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../network/client/TransportResponseHandler.java   | 76 +++++++++++++++-------
 1 file changed, 51 insertions(+), 25 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index 7282fa913..78120a2c1 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -79,31 +79,49 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     this.timeOfLastRequestNs = new AtomicLong(0);
     this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
     this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+
+    String module = conf.getModuleName();
+    boolean checkPushTimeout = false;
+    boolean checkFetchTimeout = false;
+    if (TransportModuleConstants.DATA_MODULE.equals(module)) {
+      checkPushTimeout = true;
+      checkFetchTimeout = true;
+    } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) {
+      checkPushTimeout = true;
+    }
     synchronized (TransportResponseHandler.class) {
-      if (pushTimeoutChecker == null) {
-        pushTimeoutChecker =
-            ThreadUtils.newDaemonThreadPoolScheduledExecutor(
-                "push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
-      }
-      if (fetchTimeoutChecker == null) {
-        fetchTimeoutChecker =
-            ThreadUtils.newDaemonThreadPoolScheduledExecutor(
-                "fetch-timeout-checker", 
conf.fetchDataTimeoutCheckerThreads());
+      if (checkPushTimeout) {
+        if (pushTimeoutChecker == null) {
+          pushTimeoutChecker =
+              ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+                  "push-timeout-checker", 
conf.pushDataTimeoutCheckerThreads());
+        }
+        if (checkFetchTimeout) {
+          if (fetchTimeoutChecker == null) {
+            fetchTimeoutChecker =
+                ThreadUtils.newDaemonThreadPoolScheduledExecutor(
+                    "fetch-timeout-checker", 
conf.fetchDataTimeoutCheckerThreads());
+          }
+        }
       }
     }
-    pushCheckerScheduleFuture =
-        pushTimeoutChecker.scheduleAtFixedRate(
-            () -> failExpiredPushRequest(),
-            pushTimeoutCheckerInterval,
-            pushTimeoutCheckerInterval,
-            TimeUnit.MILLISECONDS);
+    if (checkPushTimeout) {
+      pushCheckerScheduleFuture =
+          pushTimeoutChecker.scheduleAtFixedRate(
+              () -> failExpiredPushRequest(),
+              pushTimeoutCheckerInterval,
+              pushTimeoutCheckerInterval,
+              TimeUnit.MILLISECONDS);
+    }
 
-    fetchCheckerScheduleFuture =
-        fetchTimeoutChecker.scheduleAtFixedRate(
-            () -> failExpiredFetchRequest(),
-            fetchTimeoutCheckerInterval,
-            fetchTimeoutCheckerInterval,
-            TimeUnit.MILLISECONDS);
+    if (checkFetchTimeout) {
+      fetchCheckerScheduleFuture =
+          fetchTimeoutChecker.scheduleAtFixedRate(
+              () -> failExpiredFetchRequest(),
+              fetchTimeoutCheckerInterval,
+              fetchTimeoutCheckerInterval,
+              TimeUnit.MILLISECONDS);
+    }
   }
 
   public void failExpiredPushRequest() {
@@ -251,8 +269,12 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
           remoteAddress);
       failOutstandingRequests(new IOException("Connection from " + 
remoteAddress + " closed"));
     }
-    pushCheckerScheduleFuture.cancel(false);
-    fetchCheckerScheduleFuture.cancel(false);
+    if (pushCheckerScheduleFuture != null) {
+      pushCheckerScheduleFuture.cancel(false);
+    }
+    if (fetchCheckerScheduleFuture != null) {
+      fetchCheckerScheduleFuture.cancel(false);
+    }
   }
 
   @Override
@@ -265,8 +287,12 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
           remoteAddress);
       failOutstandingRequests(cause);
     }
-    pushCheckerScheduleFuture.cancel(false);
-    fetchCheckerScheduleFuture.cancel(false);
+    if (pushCheckerScheduleFuture != null) {
+      pushCheckerScheduleFuture.cancel(false);
+    }
+    if (fetchCheckerScheduleFuture != null) {
+      fetchCheckerScheduleFuture.cancel(false);
+    }
   }
 
   @Override

Reply via email to