This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 652908519eed YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn router thread pool executor. (#6222) 652908519eed is described below commit 652908519eed5fe79b696e97cc62f2014387be31 Author: slfan1989 <55643692+slfan1...@users.noreply.github.com> AuthorDate: Fri Oct 27 04:39:06 2023 +0800 YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn router thread pool executor. (#6222) --- .../org/apache/hadoop/yarn/conf/YarnConfiguration.java | 16 ++++++++++++++++ .../src/main/resources/yarn-default.xml | 17 +++++++++++++++++ .../router/clientrm/FederationClientInterceptor.java | 8 +++++++- .../router/rmadmin/FederationRMAdminInterceptor.java | 14 +++++++++++++- 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 90a8978a228b..2a204519228a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4369,6 +4369,22 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME = TimeUnit.SECONDS.toMillis(0); // 0s + /** + * This method configures the policy for core threads regarding termination + * when no tasks arrive within the keep-alive time. + * When set to false, core threads are never terminated due to a lack of tasks. + * When set to true, the same keep-alive policy + * that applies to non-core threads also applies to core threads. + * To prevent constant thread replacement, + * ensure that the keep-alive time is greater than zero when setting it to true. + * It's advisable to call this method before the pool becomes actively used. + */ + public static final String ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = + ROUTER_PREFIX + "interceptor.user-thread-pool.allow-core-thread-time-out"; + + public static final boolean DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = + false; + /** The address of the Router web application. */ public static final String ROUTER_WEBAPP_ADDRESS = ROUTER_WEBAPP_PREFIX + "address"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9991e841d74b..72e8cc70f874 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5139,6 +5139,23 @@ </description> </property> + <property> + <name>yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out</name> + <value>false</value> + <description> + This method configures the policy for core threads regarding termination + when no tasks arrive within the keep-alive time. + When set to false, core threads are never terminated due to a lack of tasks. + When set to true, the same keep-alive policy + that applies to non-core threads also applies to core threads. + To prevent constant thread replacement, + ensure that the keep-alive time is greater than zero when setting it to true. + It's advisable to call this method before the pool becomes actively used. + We need to ensure that + yarn.router.interceptor.user-thread-pool.keep-alive-time is greater than 0. + </description> + </property> + <property> <name>yarn.router.submit.interval.time</name> <value>10ms</value> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 9c3f9971d8c7..35b3e6eeb2bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -231,7 +231,13 @@ public class FederationClientInterceptor keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory); // Adding this line so that unused user threads will exit and be cleaned up if idle for too long - this.executorService.allowCoreThreadTimeOut(true); + boolean allowCoreThreadTimeOut = getConf().getBoolean( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); + + if (keepAliveTime > 0 && allowCoreThreadTimeOut) { + this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + } final Configuration conf = this.getConf(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index b7c1462a60d5..d269cfe0971c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -130,9 +130,21 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build(); + long keepAliveTime = getConf().getTimeDuration( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS); + BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); this.executorService = new ThreadPoolExecutor(numThreads, numThreads, - 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); + keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory); + + boolean allowCoreThreadTimeOut = getConf().getBoolean( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); + + if (keepAliveTime > 0 && allowCoreThreadTimeOut) { + this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + } federationFacade = FederationStateStoreFacade.getInstance(this.getConf()); this.conf = this.getConf(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org