This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f66c89b6571490f9078a09724286d7db6df08fb5
Author: hfutatzhanghb <hfutzhan...@163.com>
AuthorDate: Mon Jan 20 14:26:10 2025 +0800

    HDFS-17651.[ARR] Async handler executor isolation (#7244). Contributed by 
hfutatzhanghb.
    
    Reviewed-by: Jian Zhang <keeprom...@apache.org>
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../hdfs/protocolPB/AsyncRpcProtocolPBUtil.java    |  10 +-
 .../server/federation/router/RBFConfigKeys.java    |  25 ++--
 .../server/federation/router/RouterRpcServer.java  | 139 +++++++++++++++------
 .../router/async/RouterAsyncRpcClient.java         |  10 +-
 .../src/main/resources/hdfs-rbf-default.xml        |  35 +++++-
 .../protocolPB/TestAsyncRpcProtocolPBUtil.java     |   2 +-
 .../TestRouterClientSideTranslatorPB.java          |   2 +-
 .../router/async/RouterAsyncProtocolTestBase.java  |  13 +-
 .../router/async/TestRouterAsyncErasureCoding.java |  13 +-
 .../router/async/TestRouterAsyncQuota.java         |  13 +-
 .../router/async/TestRouterAsyncRpc.java           |   4 +-
 .../router/async/TestRouterAsyncRpcClient.java     |  13 +-
 .../async/TestRouterAsyncRpcMultiDestination.java  |   4 +-
 13 files changed, 190 insertions(+), 93 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
index ffab0f1c487..43bbe037390 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java
@@ -55,7 +55,7 @@
 public final class AsyncRpcProtocolPBUtil {
   public static final Logger LOG = 
LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
   /** The executor used for handling responses asynchronously. */
-  private static Executor worker;
+  private static Executor asyncResponderExecutor;
 
   private AsyncRpcProtocolPBUtil() {}
 
@@ -97,7 +97,7 @@ public static <T, R> R asyncIpcClient(
       } catch (Exception ex) {
         throw warpCompletionException(ex);
       }
-    }, worker));
+    }, asyncResponderExecutor));
     return asyncReturn(clazz);
   }
 
@@ -144,10 +144,10 @@ public static <T> void asyncRouterServer(ServerReq<T> 
req, ServerRes<T> res) {
    * Sets the executor used for handling responses asynchronously within
    * the utility class.
    *
-   * @param worker The executor to be used for handling responses 
asynchronously.
+   * @param asyncResponderExecutor The executor to be used for handling 
responses asynchronously.
    */
-  public static void setWorker(Executor worker) {
-    AsyncRpcProtocolPBUtil.worker = worker;
+  public static void setAsyncResponderExecutor(Executor 
asyncResponderExecutor) {
+    AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor;
   }
 
   @FunctionalInterface
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index ced6e07d985..955d98cfc85 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -72,15 +72,22 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_RPC_ENABLE =
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
-  public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
-  public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
-  public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
-  public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
+  // HDFS Router Asynchronous RPC
+  public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
+      FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
+  public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
+  public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
+          FEDERATION_ROUTER_PREFIX + "async.rpc.";
+  // Example: ns1:count1,ns2:count2,ns3:count3
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT = 
"";
+  public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
+  public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
+  public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
+  public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
 
   public static final String DFS_ROUTER_METRICS_ENABLE =
       FEDERATION_ROUTER_PREFIX + "metrics.enable";
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 060d207837f..77bebab4ade 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -18,6 +18,16 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
@@ -26,16 +36,8 @@
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static 
org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
@@ -56,6 +58,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -63,8 +66,8 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -72,6 +75,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -209,6 +213,7 @@
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 import 
org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol,
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RouterRpcServer.class);
-  private ExecutorService asyncRouterHandler;
-  private ExecutorService asyncRouterResponder;
+
+  /** Name service keyword to identify fan-out calls. */
+  public static final String CONCURRENT_NS = "concurrent";
 
   /** Configuration for the RPC server. */
   private Configuration conf;
@@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService 
implements ClientProtocol,
   /** Schedule the router federation rename jobs. */
   private BalanceProcedureScheduler fedRenameScheduler;
   private boolean enableAsync;
+  private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
+  private Map<String, ExecutorService> asyncRouterHandlerExecutors = new 
ConcurrentHashMap<>();
+  private ExecutorService routerAsyncResponderExecutor;
+  private ExecutorService routerDefaultAsyncHandlerExecutor;
 
   /**
    * Construct a router RPC server.
@@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router,
     int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
         DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);
 
-    this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
-        DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
-    LOG.info("Router enable async {}", this.enableAsync);
+    this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
+        DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT);
+    LOG.info("Router enable async rpc: {}", this.enableAsync);
     if (this.enableAsync) {
-      initAsyncThreadPool();
+      initAsyncThreadPools(conf);
     }
     // Override Hadoop Common IPC setting
     int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
@@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router,
     // Create the client
     if (this.enableAsync) {
       this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
-          this.namenodeResolver, this.rpcMonitor,
-          routerStateIdContext, asyncRouterHandler);
+          this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
       this.clientProto = new RouterAsyncClientProtocol(conf, this);
       this.nnProto = new RouterAsyncNamenodeProtocol(this);
       this.routerProto = new RouterAsyncUserProtocol(this);
@@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The value of config key: {} is empty. Will use default conf.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
     }
-    AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+    String[] nsHandlers = configNsHandler.split(",");
+    for (String nsHandlerInfo : nsHandlers) {
+      String[] nsHandlerItems = nsHandlerInfo.split(":");
+      if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) 
||
+          !StringUtils.isNumeric(nsHandlerItems[1])) {
+        LOG.error("The config key: {} is incorrect! The value is {}.",
+            DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
+        continue;
+      }
+      nsAsyncHandlerCount.put(nsHandlerItems[0], 
Integer.parseInt(nsHandlerItems[1]));
+    }
+  }
+
+  private void initAsyncHandlerThreadPools4Ns(String nsId, int 
dedicatedHandlers) {
+    asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> 
Executors.newFixedThreadPool(
+        dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " 
+ id + " #")));
   }
 
   /**
@@ -2426,8 +2489,12 @@ public boolean isAsync() {
     return this.enableAsync;
   }
 
-  public Executor getAsyncRouterHandler() {
-    return asyncRouterHandler;
+  public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
+    return asyncRouterHandlerExecutors;
+  }
+
+  public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
+    return routerDefaultAsyncHandlerExecutor;
   }
 
   private static class AsyncThreadFactory implements ThreadFactory {
@@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements 
ThreadFactory {
     }
 
     @Override
-    public Thread newThread(Runnable r) {
-      return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+    public Thread newThread(@NonNull Runnable r) {
+      Thread thread = new Thread(r, namePrefix + 
threadNumber.getAndIncrement());
+      thread.setDaemon(true);
+      return thread;
     }
   }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
index 32dfab93152..c214adf1f2a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java
@@ -57,7 +57,6 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 import static 
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
@@ -98,7 +97,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
   private final ActiveNamenodeResolver namenodeResolver;
   /** Optional perf monitor. */
   private final RouterRpcMonitor rpcMonitor;
-  private final Executor asyncRouterHandler;
 
   /**
    * Create a router async RPC client to manage remote procedure calls to NNs.
@@ -108,17 +106,15 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
    * @param resolver A NN resolver to determine the currently active NN in HA.
    * @param monitor Optional performance monitor.
    * @param routerStateIdContext the router state context object to hold the 
state ids for all
-   * @param asyncRouterHandler async router handler
    * namespaces.
    */
   public RouterAsyncRpcClient(Configuration conf,
       Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
-      RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) {
+      RouterStateIdContext routerStateIdContext) {
     super(conf, router, resolver, monitor, routerStateIdContext);
     this.router = router;
     this.namenodeResolver = resolver;
     this.rpcMonitor = monitor;
-    this.asyncRouterHandler = asyncRouterHandler;
   }
 
   /**
@@ -172,6 +168,7 @@ public Object invokeMethod(
           " with params " + Arrays.deepToString(params) + " from "
           + router.getRouterId());
     }
+    String nsid = namenodes.get(0).getNameserviceId();
     // transfer threadLocalContext to worker threads of executor.
     ThreadLocalContext threadLocalContext = new ThreadLocalContext();
     asyncComplete(null);
@@ -183,7 +180,8 @@ public Object invokeMethod(
       threadLocalContext.transfer();
       invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
           useObserver, protocol, method, params);
-    }, asyncRouterHandler);
+    }, 
router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
+        router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
     return null;
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 81dbf4273d4..46d273ab522 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -49,13 +49,40 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.enable</name>
+    <name>dfs.federation.router.async.rpc.enable</name>
     <value>false</value>
     <description>
       If true, router will process the RPC request asynchronously.
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.async.rpc.ns.handler.count</name>
+    <value></value>
+    <description>
+      The number of asynchronous handlers per nameservice, separated by 
commas, internally separated by colons.
+      The identifier of nameservice is in dfs.nameservices configuration entry.
+      Such as: ns1:count1,ns2:count2,ns3:count3.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
+    <value>10</value>
+    <description>
+      For those nameservices not in 
dfs.federation.router.async.rpc.ns.handler.count configuration entry,
+      use this value as the asynchronous handler thread counts.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
+    <value>10</value>
+    <description>
+      The thread counts of async responder executor.
+    </description>
+  </property>
+
   <property>
     <name>dfs.federation.router.rpc-address</name>
     <value>0.0.0.0:8888</value>
@@ -110,15 +137,15 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.handler.count</name>
-    <value>2</value>
+    <name>dfs.federation.router.async.rpc.handler.count</name>
+    <value>10</value>
     <description>
       The number of async handler for the router to handle RPC client requests.
     </description>
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.responder.count</name>
+    <name>dfs.federation.router.async.rpc.responder.count</name>
     <value>10</value>
     <description>
       The number of async responder for the router to handle responses.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
index 9b88e3b9956..b696572dd47 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java
@@ -54,7 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {
 
   @Before
   public void setUp() throws IOException {
-    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
     Configuration conf = new Configuration();
     RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
         ProtobufRpcEngine2.class);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
index 637a4b38ae7..4732dc778b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java
@@ -82,7 +82,7 @@ public class TestRouterClientSideTranslatorPB {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
     conf = new HdfsConfiguration();
     cluster = (new MiniDFSCluster.Builder(conf))
         .numDataNodes(1).build();
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
index f7dc29d37d5..893afddbc63 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java
@@ -38,8 +38,8 @@
 
 import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -80,8 +80,8 @@ public static void setUpCluster() throws Exception {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -108,12 +108,11 @@ public void setUp() throws IOException {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     routerAsyncRpcServer = Mockito.spy(routerRpcServer);
     
Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
     Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
index 46faf238b83..1cac778a809 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java
@@ -49,8 +49,8 @@
 
 import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -94,8 +94,8 @@ public static void setUpCluster() throws Exception {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -122,12 +122,11 @@ public void setUp() throws IOException {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncErasureCoding = new AsyncErasureCoding(spy);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
index f7e4c4c3f35..eecc91002eb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java
@@ -44,8 +44,8 @@
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertTrue;
 
@@ -87,8 +87,8 @@ public static void setUpCluster() throws Exception {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -116,12 +116,11 @@ public void setUp() throws IOException {
     router = cluster.getRandomRouter();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
     RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
     RouterRpcServer spy = Mockito.spy(routerRpcServer);
     Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
     asyncQuota = new AsyncQuota(router.getRouter(), spy);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
index 09fe7c9db93..7290c0a0aee 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java
@@ -29,7 +29,7 @@
 
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -51,7 +51,7 @@ public static void globalSetUp() throws Exception {
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     // use async router.
-    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     setUp(routerConf);
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
index 0d6a23407b3..59c5b2de239 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java
@@ -57,8 +57,8 @@
 import static org.apache.hadoop.fs.permission.FsAction.ALL;
 import static 
org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
 import static 
org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -110,8 +110,8 @@ public static void setUpCluster() throws Exception {
 
     // Reduce the number of RPC clients threads to overload the Router easy
     routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
-    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
+    routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
     // We decrease the DN cache times to make the test faster
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
@@ -146,14 +146,13 @@ public void setup() throws Exception {
     rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics();
     routerFs = router.getFileSystem();
     routerRpcServer = router.getRouterRpcServer();
-    routerRpcServer.initAsyncThreadPool();
+    routerRpcServer.initAsyncThreadPools(routerConf);
 
     // Create a RouterAsyncRpcClient object
     asyncRpcClient = new RouterAsyncRpcClient(
         routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
         routerRpcServer.getRPCMonitor(),
-        routerRpcServer.getRouterStateIdContext(),
-        routerRpcServer.getAsyncRouterHandler());
+        routerRpcServer.getRouterStateIdContext());
 
     // Create a test file
     FSDataOutputStream fsDataOutputStream = routerFs.create(
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java
index 0ded95aa06b..ec1ff0ce97b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java
@@ -28,7 +28,7 @@
 
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
 import static 
org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -48,7 +48,7 @@ public static void globalSetUp() throws Exception {
     routerConf.setTimeDuration(
         RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
     // use async router.
-    routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true);
+    routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
     setUp(routerConf);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to