This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch branch-0.3 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit f146f5cb69d8820a3b6caa6dc69caad3c45254d7 Author: onebox-li <[email protected]> AuthorDate: Thu Sep 28 10:43:08 2023 +0800 [CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks ### What changes were proposed in this pull request? Only push/data module needs push-timeout-checker, and data module needs fetch-timeout-checker. Here make push-timeout-checker not to be created in Master/LifeCycleManager, and fetch-timeout-checker in Worker. The same goes for related timeout checker schedule tasks. ### Why are the changes needed? Ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Cluster test Closes #1940 from onebox-li/checker-dev. Authored-by: onebox-li <[email protected]> Signed-off-by: zky.zhoukeyong <[email protected]> (cherry picked from commit ef4fc51d5f2d9d1a707e06cac2c64f3a07082e42) Signed-off-by: zky.zhoukeyong <[email protected]> --- .../network/client/TransportResponseHandler.java | 76 +++++++++++++++------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java index 7282fa913..78120a2c1 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java @@ -79,31 +79,49 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { this.timeOfLastRequestNs = new AtomicLong(0); this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs(); this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs(); + + String module = conf.getModuleName(); + boolean checkPushTimeout = false; + boolean checkFetchTimeout = false; + if (TransportModuleConstants.DATA_MODULE.equals(module)) { + checkPushTimeout = true; + checkFetchTimeout = true; + } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) { + checkPushTimeout = true; + } synchronized (TransportResponseHandler.class) { - if (pushTimeoutChecker == null) { - pushTimeoutChecker = - ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); - } - if (fetchTimeoutChecker == null) { - fetchTimeoutChecker = - ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + if (checkPushTimeout) { + if (pushTimeoutChecker == null) { + pushTimeoutChecker = + ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); + } + if (checkFetchTimeout) { + if (fetchTimeoutChecker == null) { + fetchTimeoutChecker = + ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + } + } } } - pushCheckerScheduleFuture = - pushTimeoutChecker.scheduleAtFixedRate( - () -> failExpiredPushRequest(), - pushTimeoutCheckerInterval, - pushTimeoutCheckerInterval, - TimeUnit.MILLISECONDS); + if (checkPushTimeout) { + pushCheckerScheduleFuture = + pushTimeoutChecker.scheduleAtFixedRate( + () -> failExpiredPushRequest(), + pushTimeoutCheckerInterval, + pushTimeoutCheckerInterval, + TimeUnit.MILLISECONDS); + } - fetchCheckerScheduleFuture = - fetchTimeoutChecker.scheduleAtFixedRate( - () -> failExpiredFetchRequest(), - fetchTimeoutCheckerInterval, - fetchTimeoutCheckerInterval, - TimeUnit.MILLISECONDS); + if (checkFetchTimeout) { + fetchCheckerScheduleFuture = + fetchTimeoutChecker.scheduleAtFixedRate( + () -> failExpiredFetchRequest(), + fetchTimeoutCheckerInterval, + fetchTimeoutCheckerInterval, + TimeUnit.MILLISECONDS); + } } public void failExpiredPushRequest() { @@ -251,8 +269,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { remoteAddress); failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed")); } - pushCheckerScheduleFuture.cancel(false); - fetchCheckerScheduleFuture.cancel(false); + if (pushCheckerScheduleFuture != null) { + pushCheckerScheduleFuture.cancel(false); + } + if (fetchCheckerScheduleFuture != null) { + fetchCheckerScheduleFuture.cancel(false); + } } @Override @@ -265,8 +287,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { remoteAddress); failOutstandingRequests(cause); } - pushCheckerScheduleFuture.cancel(false); - fetchCheckerScheduleFuture.cancel(false); + if (pushCheckerScheduleFuture != null) { + pushCheckerScheduleFuture.cancel(false); + } + if (fetchCheckerScheduleFuture != null) { + fetchCheckerScheduleFuture.cancel(false); + } } @Override
