This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f66c89b6571490f9078a09724286d7db6df08fb5 Author: hfutatzhanghb <hfutzhan...@163.com> AuthorDate: Mon Jan 20 14:26:10 2025 +0800 HDFS-17651.[ARR] Async handler executor isolation (#7244). Contributed by hfutatzhanghb. Reviewed-by: Jian Zhang <keeprom...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../hdfs/protocolPB/AsyncRpcProtocolPBUtil.java | 10 +- .../server/federation/router/RBFConfigKeys.java | 25 ++-- .../server/federation/router/RouterRpcServer.java | 139 +++++++++++++++------ .../router/async/RouterAsyncRpcClient.java | 10 +- .../src/main/resources/hdfs-rbf-default.xml | 35 +++++- .../protocolPB/TestAsyncRpcProtocolPBUtil.java | 2 +- .../TestRouterClientSideTranslatorPB.java | 2 +- .../router/async/RouterAsyncProtocolTestBase.java | 13 +- .../router/async/TestRouterAsyncErasureCoding.java | 13 +- .../router/async/TestRouterAsyncQuota.java | 13 +- .../router/async/TestRouterAsyncRpc.java | 4 +- .../router/async/TestRouterAsyncRpcClient.java | 13 +- .../async/TestRouterAsyncRpcMultiDestination.java | 4 +- 13 files changed, 190 insertions(+), 93 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java index ffab0f1c487..43bbe037390 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java @@ -55,7 +55,7 @@ public final class AsyncRpcProtocolPBUtil { public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class); /** The executor used for handling responses asynchronously. */ - private static Executor worker; + private static Executor asyncResponderExecutor; private AsyncRpcProtocolPBUtil() {} @@ -97,7 +97,7 @@ public static <T, R> R asyncIpcClient( } catch (Exception ex) { throw warpCompletionException(ex); } - }, worker)); + }, asyncResponderExecutor)); return asyncReturn(clazz); } @@ -144,10 +144,10 @@ public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) { * Sets the executor used for handling responses asynchronously within * the utility class. * - * @param worker The executor to be used for handling responses asynchronously. + * @param asyncResponderExecutor The executor to be used for handling responses asynchronously. */ - public static void setWorker(Executor worker) { - AsyncRpcProtocolPBUtil.worker = worker; + public static void setAsyncResponderExecutor(Executor asyncResponderExecutor) { + AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor; } @FunctionalInterface diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index ced6e07d985..955d98cfc85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -72,15 +72,22 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_RPC_ENABLE = FEDERATION_ROUTER_PREFIX + "rpc.enable"; public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; - public static final String DFS_ROUTER_RPC_ENABLE_ASYNC = - FEDERATION_ROUTER_PREFIX + "rpc.async.enable"; - public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false; - public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT = - FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count"; - public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2; - public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT = - FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count"; - public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10; + // HDFS Router Asynchronous RPC + public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY = + FEDERATION_ROUTER_PREFIX + "async.rpc.enable"; + public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false; + public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX = + FEDERATION_ROUTER_PREFIX + "async.rpc."; + // Example: ns1:count1,ns2:count2,ns3:count3 + public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count"; + public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT = ""; + public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count"; + public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; + public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; public static final String DFS_ROUTER_METRICS_ENABLE = FEDERATION_ROUTER_PREFIX + "metrics.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 060d207837f..77bebab4ade 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -18,6 +18,16 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT; @@ -26,16 +36,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; @@ -56,6 +58,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -63,8 +66,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -72,6 +75,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; @@ -209,6 +213,7 @@ import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.util.ReflectionUtils; +import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private static final Logger LOG = LoggerFactory.getLogger(RouterRpcServer.class); - private ExecutorService asyncRouterHandler; - private ExecutorService asyncRouterResponder; + + /** Name service keyword to identify fan-out calls. */ + public static final String CONCURRENT_NS = "concurrent"; /** Configuration for the RPC server. */ private Configuration conf; @@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** Schedule the router federation rename jobs. */ private BalanceProcedureScheduler fedRenameScheduler; private boolean enableAsync; + private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>(); + private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>(); + private ExecutorService routerAsyncResponderExecutor; + private ExecutorService routerDefaultAsyncHandlerExecutor; /** * Construct a router RPC server. @@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router, int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY, DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT); - this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, - DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT); - LOG.info("Router enable async {}", this.enableAsync); + this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, + DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT); + LOG.info("Router enable async rpc: {}", this.enableAsync); if (this.enableAsync) { - initAsyncThreadPool(); + initAsyncThreadPools(conf); } // Override Hadoop Common IPC setting int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY, @@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router, // Create the client if (this.enableAsync) { this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor, - routerStateIdContext, asyncRouterHandler); + this.namenodeResolver, this.rpcMonitor, routerStateIdContext); this.clientProto = new RouterAsyncClientProtocol(conf, this); this.nnProto = new RouterAsyncNamenodeProtocol(this); this.routerProto = new RouterAsyncUserProtocol(this); @@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router, /** * Init router async handlers and router async responders. + * @param configuration the configuration. */ - public void initAsyncThreadPool() { - int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, - DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT); - int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, - DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT); - if (asyncRouterHandler == null) { - LOG.info("init router async handler count: {}", asyncHandlerCount); - asyncRouterHandler = Executors.newFixedThreadPool( - asyncHandlerCount, new AsyncThreadFactory("router async handler ")); + public void initAsyncThreadPools(Configuration configuration) { + LOG.info("Begin initialize asynchronous handler and responder thread pool."); + initNsAsyncHandlerCount(); + Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration); + Set<String> unassignedNS = new HashSet<>(); + allConfiguredNS.add(CONCURRENT_NS); + + for (String nsId : allConfiguredNS) { + int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0); + LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + if (dedicatedHandlers > 0) { + initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers); + LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId); + } else { + unassignedNS.add(nsId); + } + } + + int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); + + if (!unassignedNS.isEmpty()) { + LOG.warn("Async handler unassigned ns: {}", unassignedNS); + LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault); + for (String nsId : unassignedNS) { + initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault); + } } - if (asyncRouterResponder == null) { - LOG.info("init router async responder count: {}", asyncResponderCount); - asyncRouterResponder = Executors.newFixedThreadPool( - asyncResponderCount, new AsyncThreadFactory("router async responder ")); + + int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, + DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT); + if (routerAsyncResponderExecutor == null) { + LOG.info("Initialize router async responder count: {}", asyncResponderCount); + routerAsyncResponderExecutor = Executors.newFixedThreadPool( + asyncResponderCount, new AsyncThreadFactory("Router Async Responder #")); + } + AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor); + + if (routerDefaultAsyncHandlerExecutor == null) { + LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); + routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool( + asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #")); + } + } + + private void initNsAsyncHandlerCount() { + String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT); + if (StringUtils.isEmpty(configNsHandler)) { + LOG.error( + "The value of config key: {} is empty. Will use default conf.", + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY); } - AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder); + String[] nsHandlers = configNsHandler.split(","); + for (String nsHandlerInfo : nsHandlers) { + String[] nsHandlerItems = nsHandlerInfo.split(":"); + if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) || + !StringUtils.isNumeric(nsHandlerItems[1])) { + LOG.error("The config key: {} is incorrect! The value is {}.", + DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo); + continue; + } + nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1])); + } + } + + private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) { + asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool( + dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #"))); } /** @@ -2426,8 +2489,12 @@ public boolean isAsync() { return this.enableAsync; } - public Executor getAsyncRouterHandler() { - return asyncRouterHandler; + public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() { + return asyncRouterHandlerExecutors; + } + + public ExecutorService getRouterAsyncHandlerDefaultExecutor() { + return routerDefaultAsyncHandlerExecutor; } private static class AsyncThreadFactory implements ThreadFactory { @@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements ThreadFactory { } @Override - public Thread newThread(Runnable r) { - return new Thread(r, namePrefix + threadNumber.getAndIncrement()); + public Thread newThread(@NonNull Runnable r) { + Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 32dfab93152..c214adf1f2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -57,7 +57,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; @@ -98,7 +97,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{ private final ActiveNamenodeResolver namenodeResolver; /** Optional perf monitor. */ private final RouterRpcMonitor rpcMonitor; - private final Executor asyncRouterHandler; /** * Create a router async RPC client to manage remote procedure calls to NNs. @@ -108,17 +106,15 @@ public class RouterAsyncRpcClient extends RouterRpcClient{ * @param resolver A NN resolver to determine the currently active NN in HA. * @param monitor Optional performance monitor. * @param routerStateIdContext the router state context object to hold the state ids for all - * @param asyncRouterHandler async router handler * namespaces. */ public RouterAsyncRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, - RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) { + RouterStateIdContext routerStateIdContext) { super(conf, router, resolver, monitor, routerStateIdContext); this.router = router; this.namenodeResolver = resolver; this.rpcMonitor = monitor; - this.asyncRouterHandler = asyncRouterHandler; } /** @@ -172,6 +168,7 @@ public Object invokeMethod( " with params " + Arrays.deepToString(params) + " from " + router.getRouterId()); } + String nsid = namenodes.get(0).getNameserviceId(); // transfer threadLocalContext to worker threads of executor. ThreadLocalContext threadLocalContext = new ThreadLocalContext(); asyncComplete(null); @@ -183,7 +180,8 @@ public Object invokeMethod( threadLocalContext.transfer(); invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes, useObserver, protocol, method, params); - }, asyncRouterHandler); + }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, + router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 81dbf4273d4..46d273ab522 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -49,13 +49,40 @@ </property> <property> - <name>dfs.federation.router.rpc.async.enable</name> + <name>dfs.federation.router.async.rpc.enable</name> <value>false</value> <description> If true, router will process the RPC request asynchronously. </description> </property> + <property> + <name>dfs.federation.router.async.rpc.ns.handler.count</name> + <value></value> + <description> + The number of asynchronous handlers per nameservice, separated by commas, internally separated by colons. + The identifier of nameservice is in dfs.nameservices configuration entry. + Such as: ns1:count1,ns2:count2,ns3:count3. + </description> + </property> + + <property> + <name>dfs.federation.router.async.rpc.responder.count</name> + <value>10</value> + <description> + For those nameservices not in dfs.federation.router.async.rpc.ns.handler.count configuration entry, + use this value as the asynchronous handler thread counts. + </description> + </property> + + <property> + <name>dfs.federation.router.async.rpc.responder.count</name> + <value>10</value> + <description> + The thread counts of async responder executor. + </description> + </property> + <property> <name>dfs.federation.router.rpc-address</name> <value>0.0.0.0:8888</value> @@ -110,15 +137,15 @@ </property> <property> - <name>dfs.federation.router.rpc.async.handler.count</name> - <value>2</value> + <name>dfs.federation.router.async.rpc.handler.count</name> + <value>10</value> <description> The number of async handler for the router to handle RPC client requests. </description> </property> <property> - <name>dfs.federation.router.rpc.async.responder.count</name> + <name>dfs.federation.router.async.rpc.responder.count</name> <value>10</value> <description> The number of async responder for the router to handle responses. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java index 9b88e3b9956..b696572dd47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestAsyncRpcProtocolPBUtil.java @@ -54,7 +54,7 @@ public class TestAsyncRpcProtocolPBUtil { @Before public void setUp() throws IOException { - AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool()); + AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool()); Configuration conf = new Configuration(); RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java index 637a4b38ae7..4732dc778b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestRouterClientSideTranslatorPB.java @@ -82,7 +82,7 @@ public class TestRouterClientSideTranslatorPB { @BeforeClass public static void setUp() throws Exception { - AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool()); + AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool()); conf = new HdfsConfiguration(); cluster = (new MiniDFSCluster.Builder(conf)) .numDataNodes(1).build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java index f7dc29d37d5..893afddbc63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncProtocolTestBase.java @@ -38,8 +38,8 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.junit.Assert.assertTrue; /** @@ -80,8 +80,8 @@ public static void setUpCluster() throws Exception { // Reduce the number of RPC clients threads to overload the Router easy routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1); // We decrease the DN cache times to make the test faster routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); @@ -108,12 +108,11 @@ public void setUp() throws IOException { router = cluster.getRandomRouter(); routerFs = router.getFileSystem(); routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); + routerRpcServer.initAsyncThreadPools(routerConf); RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext(), - routerRpcServer.getAsyncRouterHandler()); + routerRpcServer.getRouterStateIdContext()); routerAsyncRpcServer = Mockito.spy(routerRpcServer); Mockito.when(routerAsyncRpcServer.getRPCClient()).thenReturn(asyncRpcClient); Mockito.when(routerAsyncRpcServer.isAsync()).thenReturn(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java index 46faf238b83..1cac778a809 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncErasureCoding.java @@ -49,8 +49,8 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -94,8 +94,8 @@ public static void setUpCluster() throws Exception { // Reduce the number of RPC clients threads to overload the Router easy routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1); // We decrease the DN cache times to make the test faster routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); @@ -122,12 +122,11 @@ public void setUp() throws IOException { router = cluster.getRandomRouter(); routerFs = router.getFileSystem(); routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); + routerRpcServer.initAsyncThreadPools(routerConf); RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext(), - routerRpcServer.getAsyncRouterHandler()); + routerRpcServer.getRouterStateIdContext()); RouterRpcServer spy = Mockito.spy(routerRpcServer); Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); asyncErasureCoding = new AsyncErasureCoding(spy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java index f7e4c4c3f35..eecc91002eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncQuota.java @@ -44,8 +44,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertTrue; @@ -87,8 +87,8 @@ public static void setUpCluster() throws Exception { // Reduce the number of RPC clients threads to overload the Router easy routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1); // We decrease the DN cache times to make the test faster routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); @@ -116,12 +116,11 @@ public void setUp() throws IOException { router = cluster.getRandomRouter(); routerFs = router.getFileSystem(); routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); + routerRpcServer.initAsyncThreadPools(routerConf); RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext(), - routerRpcServer.getAsyncRouterHandler()); + routerRpcServer.getRouterStateIdContext()); RouterRpcServer spy = Mockito.spy(routerRpcServer); Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient); asyncQuota = new AsyncQuota(router.getRouter(), spy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java index 09fe7c9db93..7290c0a0aee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -51,7 +51,7 @@ public static void globalSetUp() throws Exception { routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // use async router. - routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true); + routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); setUp(routerConf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java index 0d6a23407b3..59c5b2de239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcClient.java @@ -57,8 +57,8 @@ import static org.apache.hadoop.fs.permission.FsAction.ALL; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -110,8 +110,8 @@ public static void setUpCluster() throws Exception { // Reduce the number of RPC clients threads to overload the Router easy routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); - routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1); // We decrease the DN cache times to make the test faster routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); @@ -146,14 +146,13 @@ public void setup() throws Exception { rpcMetrics = router.getRouter().getRpcServer().getRPCMetrics(); routerFs = router.getFileSystem(); routerRpcServer = router.getRouterRpcServer(); - routerRpcServer.initAsyncThreadPool(); + routerRpcServer.initAsyncThreadPools(routerConf); // Create a RouterAsyncRpcClient object asyncRpcClient = new RouterAsyncRpcClient( routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), - routerRpcServer.getRouterStateIdContext(), - routerRpcServer.getAsyncRouterHandler()); + routerRpcServer.getRouterStateIdContext()); // Create a test file FSDataOutputStream fsDataOutputStream = routerFs.create( diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java index 0ded95aa06b..ec1ff0ce97b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcMultiDestination.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.junit.Assert.assertArrayEquals; @@ -48,7 +48,7 @@ public static void globalSetUp() throws Exception { routerConf.setTimeDuration( RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); // use async router. - routerConf.setBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC, true); + routerConf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); setUp(routerConf); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org