Hexiaoqiao commented on code in PR #7244:
URL: https://github.com/apache/hadoop/pull/7244#discussion_r1919698079


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml:
##########
@@ -49,13 +49,39 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.enable</name>
+    <name>dfs.federation.router.async.rpc.enable</name>
     <value>false</value>
     <description>
       If true, router will process the RPC request asynchronously.
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.async.rpc.ns.handler.count</name>
+    <value>nsPlaceholder1:0,nsPlaceholder2:0</value>

Review Comment:
   suggest to leave empty here because it is not valid value. Thanks.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java:
##########
@@ -72,15 +72,23 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_RPC_ENABLE =
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
-  public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
-  public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
-  public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
-  public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
+  // HDFS Router Asynchronous RPC
+  public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
+      FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
+  public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
+  public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
+          FEDERATION_ROUTER_PREFIX + "async.rpc.";
+  // Example: ns1:count1,ns2:count2,ns3:count3
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT =

Review Comment:
   Leave empty too as above comment.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);

Review Comment:
   I don't prefer the format of this value, but no better idea now. ^_^



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The config key: {} is incorrect! The value is empty.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+      configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
     }
-    AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+    String[] nsHandlers = configNsHandler.split(",");
+    for (String nsHandlerInfo : nsHandlers) {
+      String[] nsHandlerItems = nsHandlerInfo.split(":");
+      if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) 
||
+          !StringUtils.isNumeric(nsHandlerItems[1])) {
+        LOG.error("The config key: {} is incorrect! The value is {}.",
+            DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
+        continue;
+      }
+      nsAsyncHandlerCount.put(nsHandlerItems[0], 
Integer.parseInt(nsHandlerItems[1]));

Review Comment:
   Will it involve some unexpected action if `nsHandlerItems[0]` is not valid 
namespace name? Is it better to check first?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The config key: {} is incorrect! The value is empty.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+      configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;

Review Comment:
   The default value 'nsPlaceholder1:0,nsPlaceholder2:0' is invalid, we should 
avoid to use it in production cluster. Suggest leave empty for this default 
value, and give all `ns` one static value, such as 10 as the current implement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to