[ 
https://issues.apache.org/jira/browse/HDFS-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913996#comment-17913996
 ] 

ASF GitHub Bot commented on HDFS-17651:
---------------------------------------

hfutatzhanghb commented on code in PR #7244:
URL: https://github.com/apache/hadoop/pull/7244#discussion_r1919723502


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);

Review Comment:
   Thanks Sir, yes. Actually I don't like this config too, but if we do not use 
this format , we may introduce more configs. like:
   `dfs.federation.router.async.rpc.hdfs1.handler.count`
   `dfs.federation.router.async.rpc.hdfs2.handler.count`
   `dfs.federation.router.async.rpc.hdfs3.handler.count` 



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The config key: {} is incorrect! The value is empty.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+      configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;

Review Comment:
   fixed





> [ARR] Async handler executor isolation.
> ---------------------------------------
>
>                 Key: HDFS-17651
>                 URL: https://issues.apache.org/jira/browse/HDFS-17651
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: farmmamba
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> The main purpose of this PR is to isolate each nameservice by letting each 
> nameservice has its own async handler thread pool.
> Think below situation:
> When a downstream nameserivce ns1 has poor performance, the threads in async 
> handler thread pool will be occupied adding calls to the ns1's 
> Connection#calls. There will be no available threads to handle normal 
> nameservice's rpc request asynchronously. Therefore, it is better to isolate 
> the async handler thread pool of different nameservices.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to