hfutatzhanghb commented on code in PR #7244:
URL: https://github.com/apache/hadoop/pull/7244#discussion_r1919723502
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
/**
* Init router async handlers and router async responders.
+ * @param configuration the configuration.
*/
- public void initAsyncThreadPool() {
- int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
- DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
- int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
- DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
- if (asyncRouterHandler == null) {
- LOG.info("init router async handler count: {}", asyncHandlerCount);
- asyncRouterHandler = Executors.newFixedThreadPool(
- asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+ public void initAsyncThreadPools(Configuration configuration) {
+ LOG.info("Begin initialize asynchronous handler and responder thread
pool.");
+ initNsAsyncHandlerCount();
+ Set<String> allConfiguredNS =
FederationUtil.getAllConfiguredNS(configuration);
+ Set<String> unassignedNS = new HashSet<>();
+ allConfiguredNS.add(CONCURRENT_NS);
+
+ for (String nsId : allConfiguredNS) {
+ int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+ LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+ if (dedicatedHandlers > 0) {
+ initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+ LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers,
nsId);
+ } else {
+ unassignedNS.add(nsId);
+ }
+ }
+
+ int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+ if (!unassignedNS.isEmpty()) {
+ LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+ LOG.info("Use default async handler count {} for unassigned ns.",
asyncHandlerCountDefault);
+ for (String nsId : unassignedNS) {
+ initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+ }
}
- if (asyncRouterResponder == null) {
- LOG.info("init router async responder count: {}", asyncResponderCount);
- asyncRouterResponder = Executors.newFixedThreadPool(
- asyncResponderCount, new AsyncThreadFactory("router async responder
"));
+
+ int asyncResponderCount =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+ DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+ if (routerAsyncResponderExecutor == null) {
+ LOG.info("Initialize router async responder count: {}",
asyncResponderCount);
+ routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+ asyncResponderCount, new AsyncThreadFactory("Router Async Responder
#"));
+ }
+
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+ if (routerDefaultAsyncHandlerExecutor == null) {
+ LOG.info("init router async default executor handler count: {}",
asyncHandlerCountDefault);
+ routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+ asyncHandlerCountDefault, new AsyncThreadFactory("Router Async
Default Handler #"));
+ }
+ }
+
+ private void initNsAsyncHandlerCount() {
+ String configNsHandler =
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
Review Comment:
Thanks Sir, yes. Actually I don't like this config too, but if we do not use
this format , we may introduce more configs. like:
`dfs.federation.router.async.rpc.hdfs1.handler.count`
`dfs.federation.router.async.rpc.hdfs2.handler.count`
`dfs.federation.router.async.rpc.hdfs3.handler.count`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
/**
* Init router async handlers and router async responders.
+ * @param configuration the configuration.
*/
- public void initAsyncThreadPool() {
- int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
- DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
- int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
- DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
- if (asyncRouterHandler == null) {
- LOG.info("init router async handler count: {}", asyncHandlerCount);
- asyncRouterHandler = Executors.newFixedThreadPool(
- asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+ public void initAsyncThreadPools(Configuration configuration) {
+ LOG.info("Begin initialize asynchronous handler and responder thread
pool.");
+ initNsAsyncHandlerCount();
+ Set<String> allConfiguredNS =
FederationUtil.getAllConfiguredNS(configuration);
+ Set<String> unassignedNS = new HashSet<>();
+ allConfiguredNS.add(CONCURRENT_NS);
+
+ for (String nsId : allConfiguredNS) {
+ int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+ LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+ if (dedicatedHandlers > 0) {
+ initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+ LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers,
nsId);
+ } else {
+ unassignedNS.add(nsId);
+ }
+ }
+
+ int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+ if (!unassignedNS.isEmpty()) {
+ LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+ LOG.info("Use default async handler count {} for unassigned ns.",
asyncHandlerCountDefault);
+ for (String nsId : unassignedNS) {
+ initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+ }
}
- if (asyncRouterResponder == null) {
- LOG.info("init router async responder count: {}", asyncResponderCount);
- asyncRouterResponder = Executors.newFixedThreadPool(
- asyncResponderCount, new AsyncThreadFactory("router async responder
"));
+
+ int asyncResponderCount =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+ DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+ if (routerAsyncResponderExecutor == null) {
+ LOG.info("Initialize router async responder count: {}",
asyncResponderCount);
+ routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+ asyncResponderCount, new AsyncThreadFactory("Router Async Responder
#"));
+ }
+
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+ if (routerDefaultAsyncHandlerExecutor == null) {
+ LOG.info("init router async default executor handler count: {}",
asyncHandlerCountDefault);
+ routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+ asyncHandlerCountDefault, new AsyncThreadFactory("Router Async
Default Handler #"));
+ }
+ }
+
+ private void initNsAsyncHandlerCount() {
+ String configNsHandler =
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+ if (StringUtils.isEmpty(configNsHandler)) {
+ LOG.error(
+ "The config key: {} is incorrect! The value is empty.",
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+ configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]