[
https://issues.apache.org/jira/browse/HDFS-16539?focusedWorklogId=757751&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-757751
]
ASF GitHub Bot logged work on HDFS-16539:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/22 04:12
Start Date: 18/Apr/22 04:12
Worklog Time Spent: 10m
Work Description: ferhui commented on code in PR #4168:
URL: https://github.com/apache/hadoop/pull/4168#discussion_r851862476
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java:
##########
@@ -1585,21 +1591,20 @@ private void acquirePermit(
/**
* Release permit for specific nsId after processing against downstream
* nsId is completed.
- *
- * @param nsId Identifier of the block pool.
+ * @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
+ * @param controller fairness policy controller to release permit from
*/
- private void releasePermit(
- final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
{
- if (routerRpcFairnessPolicyController != null) {
- routerRpcFairnessPolicyController.releasePermit(nsId);
+ private void releasePermit(final String nsId, final UserGroupInformation ugi,
+ final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
+ if (controller != null) {
+ controller.releasePermit(nsId);
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
m.getMethodName());
}
}
- @VisibleForTesting
public RouterRpcFairnessPolicyController
getRouterRpcFairnessPolicyController() {
return routerRpcFairnessPolicyController;
Review Comment:
routerRpcFairnessPolicyController should be volatile?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java:
##########
@@ -1622,4 +1627,9 @@ public Long getAcceptedPermitForNs(String ns) {
return acceptedPermitsPerNs.containsKey(ns) ?
acceptedPermitsPerNs.get(ns).longValue() : 0L;
}
+
+ public String refreshFairnessPolicyController(Configuration conf) {
+ routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
Review Comment:
newInstance can return null
maybe we need to check it and assign it to routerRpcFairnessPolicyController
if it is not null
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+
+import static
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterRefreshFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
+
+ private StateStoreDFSCluster cluster;
+
+ @After
+ public void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Before
+ public void setupCluster() throws Exception {
+ cluster = new StateStoreDFSCluster(false, 1);
+ Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();
+
+ // Handlers concurrent:ns0 = 3:3
+ conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ StaticRouterRpcFairnessPolicyController.class,
RouterRpcFairnessPolicyController.class);
+ conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 6);
+
+ // Datanodes not needed for this test.
+ cluster.setNumDatanodesPerNameservice(0);
+
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ }
+
+ @Test
+ public void testRefreshStaticChangeHandlers() throws Exception {
+ MiniRouterDFSCluster.RouterContext routerContext =
cluster.getRandomRouter();
+ RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class);
+ RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
+ final long sleepTime = 3000;
+ Mockito.doAnswer(invocationOnMock -> {
+ Thread.sleep(sleepTime);
+ return null;
+ }).when(client)
+ .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any());
+
+ final int nThreads = 3;
+ Thread[] threadAcquirePermits = new Thread[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ Thread threadAcquirePermit = new Thread(() -> {
+ try {
+ client.invokeSingle("ns0", dummyMethod);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ threadAcquirePermits[i] = threadAcquirePermit;
+ threadAcquirePermits[i].start();
+ }
+
+ Thread.sleep(1000);
+
+ Configuration conf = routerContext.getConf();
+ final int newNs0Permits = 1; // Set to smaller than current handler count
(3)
+ conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0",
newNs0Permits);
+ Thread threadRefreshController = new Thread(() -> {
+ client.refreshFairnessPolicyController(routerContext.getConf());
+ });
+ threadRefreshController.start();
+ threadRefreshController.join();
+
+ StaticRouterRpcFairnessPolicyController controller =
+ (StaticRouterRpcFairnessPolicyController)
client.getRouterRpcFairnessPolicyController();
+ for (int i = 0; i < nThreads; i++) {
+ threadAcquirePermits[i].join();
+ }
+
+ // Controller should now have 5:1 handlers for concurrent:ns0
+ for (int i = 0; i < 5; i++) {
+ assertTrue(controller.acquirePermit(CONCURRENT_NS));
+ }
+ // Invocations before refresh should not interfere with invocations after
+ assertTrue(controller.acquirePermit("ns0"));
Review Comment:
It's better to use metrics to show that it works.
can we add a case to show that the requests can be handled by the orignal
handler after we change it to a new one?
Issue Time Tracking
-------------------
Worklog Id: (was: 757751)
Time Spent: 50m (was: 40m)
> RBF: Support refreshing/changing router fairness policy controller without
> rebooting router
> -------------------------------------------------------------------------------------------
>
> Key: HDFS-16539
> URL: https://issues.apache.org/jira/browse/HDFS-16539
> Project: Hadoop HDFS
> Issue Type: Improvement
> Reporter: Felix N
> Assignee: Felix N
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Add support for refreshing/changing router fairness policy controller without
> the need to reboot a router.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]