[ https://issues.apache.org/jira/browse/HDFS-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913996#comment-17913996 ]
ASF GitHub Bot commented on HDFS-17651: --------------------------------------- hfutatzhanghb commented on code in PR #7244: URL: https://github.com/apache/hadoop/pull/7244#discussion_r1919723502 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java: ########## @@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router, /** * Init router async handlers and router async responders. + * @param configuration the configuration. */ - public void initAsyncThreadPool() { - int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, - DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); - int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, - DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); - if (asyncRouterHandler == null) { - LOG.info("init router async handler count: {}", asyncHandlerCount); - asyncRouterHandler = Executors.newFixedThreadPool( - asyncHandlerCount, new AsyncThreadFactory("router async handler ")); + public void initAsyncThreadPools(Configuration configuration) { + LOG.info("Begin initialize asynchronous handler and responder thread pool."); + initNsAsyncHandlerCount(); + Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration); + Set<String> unassignedNS = new HashSet<>(); + allConfiguredNS.add(CONCURRENT_NS); + + for (String nsId : allConfiguredNS) { + int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0); + LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + if (dedicatedHandlers > 0) { + initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers); + LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId); + } else { + unassignedNS.add(nsId); + } + } + + int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); + + if (!unassignedNS.isEmpty()) { + LOG.warn("Async handler unassigned ns: {}", unassignedNS); + LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault); + for (String nsId : unassignedNS) { + initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault); + } } - if (asyncRouterResponder == null) { - LOG.info("init router async responder count: {}", asyncResponderCount); - asyncRouterResponder = Executors.newFixedThreadPool( - asyncResponderCount, new AsyncThreadFactory("router async responder ")); + + int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, + DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT); + if (routerAsyncResponderExecutor == null) { + LOG.info("Initialize router async responder count: {}", asyncResponderCount); + routerAsyncResponderExecutor = Executors.newFixedThreadPool( + asyncResponderCount, new AsyncThreadFactory("Router Async Responder #")); + } + AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor); + + if (routerDefaultAsyncHandlerExecutor == null) { + LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); + routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool( + asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #")); + } + } + + private void initNsAsyncHandlerCount() { + String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT); Review Comment: Thanks Sir, yes. Actually I don't like this config too, but if we do not use this format , we may introduce more configs. like: `dfs.federation.router.async.rpc.hdfs1.handler.count` `dfs.federation.router.async.rpc.hdfs2.handler.count` `dfs.federation.router.async.rpc.hdfs3.handler.count` ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java: ########## @@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router, /** * Init router async handlers and router async responders. + * @param configuration the configuration. */ - public void initAsyncThreadPool() { - int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, - DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); - int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, - DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); - if (asyncRouterHandler == null) { - LOG.info("init router async handler count: {}", asyncHandlerCount); - asyncRouterHandler = Executors.newFixedThreadPool( - asyncHandlerCount, new AsyncThreadFactory("router async handler ")); + public void initAsyncThreadPools(Configuration configuration) { + LOG.info("Begin initialize asynchronous handler and responder thread pool."); + initNsAsyncHandlerCount(); + Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration); + Set<String> unassignedNS = new HashSet<>(); + allConfiguredNS.add(CONCURRENT_NS); + + for (String nsId : allConfiguredNS) { + int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0); + LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + if (dedicatedHandlers > 0) { + initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers); + LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId); + } else { + unassignedNS.add(nsId); + } + } + + int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); + + if (!unassignedNS.isEmpty()) { + LOG.warn("Async handler unassigned ns: {}", unassignedNS); + LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault); + for (String nsId : unassignedNS) { + initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault); + } } - if (asyncRouterResponder == null) { - LOG.info("init router async responder count: {}", asyncResponderCount); - asyncRouterResponder = Executors.newFixedThreadPool( - asyncResponderCount, new AsyncThreadFactory("router async responder ")); + + int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, + DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT); + if (routerAsyncResponderExecutor == null) { + LOG.info("Initialize router async responder count: {}", asyncResponderCount); + routerAsyncResponderExecutor = Executors.newFixedThreadPool( + asyncResponderCount, new AsyncThreadFactory("Router Async Responder #")); + } + AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor); + + if (routerDefaultAsyncHandlerExecutor == null) { + LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); + routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool( + asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #")); + } + } + + private void initNsAsyncHandlerCount() { + String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT); + if (StringUtils.isEmpty(configNsHandler)) { + LOG.error( + "The config key: {} is incorrect! The value is empty.", + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY); + configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT; Review Comment: fixed > [ARR] Async handler executor isolation. > --------------------------------------- > > Key: HDFS-17651 > URL: https://issues.apache.org/jira/browse/HDFS-17651 > Project: Hadoop HDFS > Issue Type: Sub-task > Reporter: farmmamba > Assignee: farmmamba > Priority: Major > Labels: pull-request-available > > The main purpose of this PR is to isolate each nameservice by letting each > nameservice has its own async handler thread pool. > Think below situation: > When a downstream nameserivce ns1 has poor performance, the threads in async > handler thread pool will be occupied adding calls to the ns1's > Connection#calls. There will be no available threads to handle normal > nameservice's rpc request asynchronously. Therefore, it is better to isolate > the async handler thread pool of different nameservices. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org