This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 652908519eed YARN-11588. [Federation] [Addendum] Fix uncleaned threads 
in yarn router thread pool executor. (#6222)
652908519eed is described below

commit 652908519eed5fe79b696e97cc62f2014387be31
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Oct 27 04:39:06 2023 +0800

    YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn router 
thread pool executor. (#6222)
---
 .../org/apache/hadoop/yarn/conf/YarnConfiguration.java  | 16 ++++++++++++++++
 .../src/main/resources/yarn-default.xml                 | 17 +++++++++++++++++
 .../router/clientrm/FederationClientInterceptor.java    |  8 +++++++-
 .../router/rmadmin/FederationRMAdminInterceptor.java    | 14 +++++++++++++-
 4 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 90a8978a228b..2a204519228a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4369,6 +4369,22 @@ public class YarnConfiguration extends Configuration {
   public static final long 
DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
       TimeUnit.SECONDS.toMillis(0); // 0s
 
+  /**
+   * This method configures the policy for core threads regarding termination
+   * when no tasks arrive within the keep-alive time.
+   * When set to false, core threads are never terminated due to a lack of 
tasks.
+   * When set to true, the same keep-alive policy
+   * that applies to non-core threads also applies to core threads.
+   * To prevent constant thread replacement,
+   * ensure that the keep-alive time is greater than zero when setting it to 
true.
+   * It's advisable to call this method before the pool becomes actively used.
+   */
+  public static final String 
ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+      ROUTER_PREFIX + 
"interceptor.user-thread-pool.allow-core-thread-time-out";
+
+  public static final boolean 
DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
+      false;
+
   /** The address of the Router web application. */
   public static final String ROUTER_WEBAPP_ADDRESS =
       ROUTER_WEBAPP_PREFIX + "address";
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 9991e841d74b..72e8cc70f874 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5139,6 +5139,23 @@
     </description>
   </property>
 
+  <property>
+    
<name>yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out</name>
+    <value>false</value>
+    <description>
+      This method configures the policy for core threads regarding termination
+      when no tasks arrive within the keep-alive time.
+      When set to false, core threads are never terminated due to a lack of 
tasks.
+      When set to true, the same keep-alive policy
+      that applies to non-core threads also applies to core threads.
+      To prevent constant thread replacement,
+      ensure that the keep-alive time is greater than zero when setting it to 
true.
+      It's advisable to call this method before the pool becomes actively used.
+      We need to ensure that
+        yarn.router.interceptor.user-thread-pool.keep-alive-time is greater 
than 0.
+    </description>
+  </property>
+
   <property>
     <name>yarn.router.submit.interval.time</name>
     <value>10ms</value>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 9c3f9971d8c7..35b3e6eeb2bd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -231,7 +231,13 @@ public class FederationClientInterceptor
         keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
 
     // Adding this line so that unused user threads will exit and be cleaned 
up if idle for too long
-    this.executorService.allowCoreThreadTimeOut(true);
+    boolean allowCoreThreadTimeOut =  getConf().getBoolean(
+        
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+        
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+    if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+      this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+    }
 
     final Configuration conf = this.getConf();
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
index b7c1462a60d5..d269cfe0971c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
@@ -130,9 +130,21 @@ public class FederationRMAdminInterceptor extends 
AbstractRMAdminRequestIntercep
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d 
").build();
 
+    long keepAliveTime = getConf().getTimeDuration(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
+        
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, 
TimeUnit.SECONDS);
+
     BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
-        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+        keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+
+    boolean allowCoreThreadTimeOut =  getConf().getBoolean(
+        
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+        
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+
+    if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
+      this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
+    }
 
     federationFacade = FederationStateStoreFacade.getInstance(this.getConf());
     this.conf = this.getConf();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to