This is an automated email from the ASF dual-hosted git repository. inigoiri pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 10365444802d HDFS-17302. RBF: ProportionRouterRpcFairnessPolicyController-Sharing and isolation. (#6380) 10365444802d is described below commit 10365444802d4e33adde1b4ec188e1e251ad1979 Author: Jian Zhang <1361320...@qq.com> AuthorDate: Sat Jan 20 06:02:21 2024 +0800 HDFS-17302. RBF: ProportionRouterRpcFairnessPolicyController-Sharing and isolation. (#6380) --- .../AbstractRouterRpcFairnessPolicyController.java | 5 + .../NoRouterRpcFairnessPolicyController.java | 5 + ...roportionRouterRpcFairnessPolicyController.java | 99 ++++++++++ .../RouterRpcFairnessPolicyController.java | 8 + .../server/federation/router/RBFConfigKeys.java | 4 + .../src/main/resources/hdfs-rbf-default.xml | 13 ++ ...roportionRouterRpcFairnessPolicyController.java | 208 +++++++++++++++++++++ .../fairness/TestRouterHandlersFairness.java | 135 +++++++++++-- .../federation/router/TestRBFConfigFields.java | 2 + 9 files changed, 461 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index 570480d49ef5..67c82729efef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -106,4 +106,9 @@ public class AbstractRouterRpcFairnessPolicyController }); return json.toString(); } + + @Override + public boolean contains(String nsId) { + return permits.containsKey(nsId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java index e0a5c31a2d0f..e408f3821a83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java @@ -56,4 +56,9 @@ public class NoRouterRpcFairnessPolicyController implements public int getAvailablePermits(String nsId) { return 0; } + + @Override + public boolean contains(String nsId) { + return true; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ProportionRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ProportionRouterRpcFairnessPolicyController.java new file mode 100644 index 000000000000..71b5623901a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ProportionRouterRpcFairnessPolicyController.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Set; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + +/** + * Proportion fairness policy extending {@link AbstractRouterRpcFairnessPolicyController} + * and fetching proportion of handlers from configuration for all available name services, + * based on the proportion and the total number of handlers, calculate the handlers of all ns. + * The handlers count will not change for this controller. + */ +public class ProportionRouterRpcFairnessPolicyController extends + AbstractRouterRpcFairnessPolicyController{ + + private static final Logger LOG = + LoggerFactory.getLogger(ProportionRouterRpcFairnessPolicyController.class); + // For unregistered ns, the default ns is used, + // so the configuration can be simplified if the handler ratio of all ns is 1, + // and transparent expansion of new ns can be supported. + private static final String DEFAULT_NS = "default_ns"; + + public ProportionRouterRpcFairnessPolicyController(Configuration conf){ + init(conf); + } + + @Override + public void init(Configuration conf) { + super.init(conf); + // Total handlers configured to process all incoming Rpc. + int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + + LOG.info("Handlers available for fairness assignment {} ", handlerCount); + + // Get all name services configured + Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + + // Insert the concurrent nameservice into the set to process together + allConfiguredNS.add(CONCURRENT_NS); + + // Insert the default nameservice into the set to process together + allConfiguredNS.add(DEFAULT_NS); + for (String nsId : allConfiguredNS) { + double dedicatedHandlerProportion = conf.getDouble( + DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + nsId, + DFS_ROUTER_FAIR_HANDLER_PROPORTION_DEFAULT); + int dedicatedHandlers = (int) (dedicatedHandlerProportion * handlerCount); + LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); + // Each NS should have at least one handler assigned. + if (dedicatedHandlers <= 0) { + dedicatedHandlers = 1; + } + insertNameServiceWithPermits(nsId, dedicatedHandlers); + LOG.info("Assigned {} handlers to nsId {} ", dedicatedHandlers, nsId); + } + } + + @Override + public boolean acquirePermit(String nsId) { + if (contains(nsId)) { + return super.acquirePermit(nsId); + } + return super.acquirePermit(DEFAULT_NS); + } + + @Override + public void releasePermit(String nsId) { + if (contains(nsId)) { + super.releasePermit(nsId); + } + super.releasePermit(DEFAULT_NS); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java index 90d8f7dd47da..cd9ae139c2a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java @@ -77,4 +77,12 @@ public interface RouterRpcFairnessPolicyController { * @return the available handler for each name service. */ int getAvailablePermits(String nsId); + + /** + * Determine whether ns has registered handlers. + * + * @param nsId name service id. + * @return true if the ns has registered handlers, false in other cases. + */ + boolean contains(String nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7000a72b3a05..5189b6b13459 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -413,6 +413,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout"; public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(1); + public static final String DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.proportion."; + public static final double DFS_ROUTER_FAIR_HANDLER_PROPORTION_DEFAULT = + 0.1; // HDFS Router Federation Rename. public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 43bd17d75f47..ec4fa46ecc35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -818,6 +818,19 @@ </description> </property> + <property> + <name>dfs.federation.router.fairness.handler.proportion.EXAMPLENAMESERVICE</name> + <value>0.1</value> + <description> + Dedicated handler proportion for nameservice EXAMPLENAMESERVICE. + The range of this value is [0, 1], and the data type is float. + If this value is configured as x, and the total number of handlers + (configed by dfs.federation.router.handler.count) of the router is y, + then the maximum number of handlers for the EXAMPLENAMESERVICE is z=(int) x*y; + If z is 0, z is reset to 1, ensuring the ns has at least one handler. + </description> + </property> + <property> <name>dfs.federation.router.federation.rename.bandwidth</name> <value>10</value> diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestProportionRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestProportionRouterRpcFairnessPolicyController.java new file mode 100644 index 000000000000..b9ccc7061d17 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestProportionRouterRpcFairnessPolicyController.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.util.Time; +import org.junit.Test; + + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test functionality of {@link ProportionRouterRpcFairnessPolicyController}. + */ +public class TestProportionRouterRpcFairnessPolicyController { + private static String nameServices = + "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + + /** + * Do not configure handlers for ns, + * 0.1 of the total number of handlers will be used by default. + */ + @Test + public void testHandlerAllocationDefault() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(30); + // By default, each ns has 3 (30*0.1) handlers. + // So the first 3 requests were successful. + for (int i=0; i<3; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue( + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + // The 4th access failed because there was no available handler. + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + + // Release a handler. + routerRpcFairnessPolicyController.releasePermit("ns1"); + routerRpcFairnessPolicyController.releasePermit("ns2"); + routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS); + + // The next request is successful. + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + /** + * The number of handlers is configured for ns. + */ + @Test + public void testHandlerAllocationPreconfigured() { + Configuration conf = createConf(40); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + "ns1", 0.5); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // ns1 should have 20 permits allocated + for (int i=0; i<20; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + + // ns2 should have 4 permits. + // concurrent should have 4 permits. + for (int i=0; i<4; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue( + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + + /** + * The handlers have not been obtained after a certain period of time. + */ + @Test + public void testAcquireTimeout() { + Configuration conf = createConf(40); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + "ns1", 0.5); + conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // ns1 should have 20 permits allocated + for (int i = 0; i < 20; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + long acquireBeginTimeMs = Time.monotonicNow(); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs; + + // There are some other operations, so acquireTimeMs >= 100ms. + assertTrue(acquireTimeMs >= 100); + } + + /** + * If 0 handlers are configured for ns, one handler will be provided for ns by default. + */ + @Test + public void testAllocationWithZeroProportion() { + Configuration conf = createConf(40); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + "ns1", 0); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // ns1 should have 1 permit allocated + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + + /** + * The sum of handlers of all ns is supported to be + * greater than the handlers available on the router, so that ns can share idle handlers. + */ + @Test + public void testAllocationHandlersGreaterThanCount() { + Configuration conf = createConf(40); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + "ns1", 0.8); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + "ns2", 0.8); + conf.setDouble(DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX + CONCURRENT_NS, 1); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // ns1 32 permit allocated + // ns2 32 permit allocated + for (int i = 0; i < 32; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + } + // CONCURRENT_NS 40 permit allocated + for (int i=0; i < 40; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + } + } + + /** + * When accessing an unregistered ns, it can also be successful. + * Therefore, to support cluster expansion with new ns, + * you only need to add a mount to the router to access it without reconfiguring handlers. + */ + @Test + public void testTransparentExtension() { + Configuration conf = createConf(40); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + // Access unregistered ns. + // There are 4 (40*0.1) handlers by default. + for (int i=0; i<4; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns_unregistered")); + } + + // The 5th access failed because there was no available handler. + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns_unregistered")); + + // Release a handler, the next request is successful. + routerRpcFairnessPolicyController.releasePermit("ns_unregistered"); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns_unregistered")); + } + + private RouterRpcFairnessPolicyController getFairnessPolicyController( + int handlers) { + return FederationUtil.newFairnessPolicyController(createConf(handlers)); + } + + private Configuration createConf(int handlers) { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers); + conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices); + conf.setClass( + RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + ProportionRouterRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java index 9b58c82bce79..36acc2a12052 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.fairness; +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -27,6 +29,10 @@ import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -46,18 +52,66 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Test the Router handlers fairness control rejects and accepts requests. */ +@RunWith(Parameterized.class) public class TestRouterHandlersFairness { private static final Logger LOG = LoggerFactory.getLogger(TestRouterHandlersFairness.class); private StateStoreDFSCluster cluster; + private Map<String, Integer> expectedHandlerPerNs; + private Class<RouterRpcFairnessPolicyController> policyControllerClass; + private int handlerCount; + private Map<String, String> configuration; + + /** + * Initialize test parameters. + * + * @param policyControllerClass RouterRpcFairnessPolicyController type. + * @param handlerCount The total number of handlers in the router. + * @param configuration Custom configuration. + * @param expectedHandlerPerNs The number of handlers expected for each ns. + */ + public TestRouterHandlersFairness( + Class<RouterRpcFairnessPolicyController> policyControllerClass, int handlerCount, + Map<String, String> configuration, Map<String, Integer> expectedHandlerPerNs) { + this.expectedHandlerPerNs = expectedHandlerPerNs; + this.policyControllerClass = policyControllerClass; + this.handlerCount = handlerCount; + this.configuration = configuration; + } + + @Parameterized.Parameters + public static Collection primes() { + return Arrays.asList(new Object[][]{ + { + // Test StaticRouterRpcFairnessPolicyController. + StaticRouterRpcFairnessPolicyController.class, + 3, + setConfiguration(null), + expectedHandlerPerNs("ns0:1, ns1:1, concurrent:1") + }, + { + // Test ProportionRouterRpcFairnessPolicyController. + ProportionRouterRpcFairnessPolicyController.class, + 20, + setConfiguration( + "dfs.federation.router.fairness.handler.proportion.ns0=0.5, " + + "dfs.federation.router.fairness.handler.proportion.ns1=0.8, " + + "dfs.federation.router.fairness.handler.proportion.concurrent=1" + ), + expectedHandlerPerNs("ns0:10, ns1:16, concurrent:20") + } + }); + } @After public void cleanup() { @@ -69,6 +123,7 @@ public class TestRouterHandlersFairness { private void setupCluster(boolean fairnessEnable, boolean ha) throws Exception { + LOG.info("Test {}", policyControllerClass.getSimpleName()); // Build and start a federated cluster cluster = new StateStoreDFSCluster(ha, 2); Configuration routerConf = new RouterConfigBuilder() @@ -80,13 +135,17 @@ public class TestRouterHandlersFairness { if (fairnessEnable) { routerConf.setClass( RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, - StaticRouterRpcFairnessPolicyController.class, + this.policyControllerClass, RouterRpcFairnessPolicyController.class); } - // With two name services configured, each nameservice has 1 permit and - // fan-out calls have 1 permit. - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 3); + routerConf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 10, TimeUnit.MILLISECONDS); + + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, this.handlerCount); + + for(Map.Entry<String, String> conf : configuration.entrySet()) { + routerConf.set(conf.getKey(), conf.getValue()); + } // Datanodes not needed for this test. cluster.setNumDatanodesPerNameservice(0); @@ -191,15 +250,19 @@ public class TestRouterHandlersFairness { if (isConcurrent) { LOG.info("Taking fanout lock first"); // take the lock for concurrent NS to block fanout calls - assertTrue(routerContext.getRouter().getRpcServer() - .getRPCClient().getRouterRpcFairnessPolicyController() - .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS)); + for(int i = 0; i < expectedHandlerPerNs.get(CONCURRENT_NS); i++) { + assertTrue(routerContext.getRouter().getRpcServer() + .getRPCClient().getRouterRpcFairnessPolicyController() + .acquirePermit(CONCURRENT_NS)); + } } else { for (String ns : cluster.getNameservices()) { LOG.info("Taking lock first for ns: {}", ns); - assertTrue(routerContext.getRouter().getRpcServer() - .getRPCClient().getRouterRpcFairnessPolicyController() - .acquirePermit(ns)); + for(int i = 0; i < expectedHandlerPerNs.get(ns); i++) { + assertTrue(routerContext.getRouter().getRpcServer() + .getRPCClient().getRouterRpcFairnessPolicyController() + .acquirePermit(ns)); + } } } } @@ -217,14 +280,18 @@ public class TestRouterHandlersFairness { if (isConcurrent) { LOG.info("Release fanout lock that was taken before test"); // take the lock for concurrent NS to block fanout calls - routerContext.getRouter().getRpcServer() - .getRPCClient().getRouterRpcFairnessPolicyController() - .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS); - } else { - for (String ns : cluster.getNameservices()) { + for(int i = 0; i < expectedHandlerPerNs.get(CONCURRENT_NS); i++) { routerContext.getRouter().getRpcServer() .getRPCClient().getRouterRpcFairnessPolicyController() - .releasePermit(ns); + .releasePermit(CONCURRENT_NS); + } + } else { + for (String ns : cluster.getNameservices()) { + for(int i = 0; i < expectedHandlerPerNs.get(ns); i++) { + routerContext.getRouter().getRpcServer() + .getRPCClient().getRouterRpcFairnessPolicyController() + .releasePermit(ns); + } } } } else { @@ -260,7 +327,7 @@ public class TestRouterHandlersFairness { .getRejectedPermitForNs(ns); } totalRejectedPermits += routerContext.getRouterRpcClient() - .getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS); + .getRejectedPermitForNs(CONCURRENT_NS); return totalRejectedPermits; } @@ -271,7 +338,7 @@ public class TestRouterHandlersFairness { .getAcceptedPermitForNs(ns); } totalAcceptedPermits += routerContext.getRouterRpcClient() - .getAcceptedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS); + .getAcceptedPermitForNs(CONCURRENT_NS); return totalAcceptedPermits; } @@ -308,4 +375,36 @@ public class TestRouterHandlersFairness { overloadException.get(); } } + + private static Map<String, Integer> expectedHandlerPerNs(String str) { + Map<String, Integer> handlersPerNsMap = new HashMap<>(); + if (str == null) { + return handlersPerNsMap; + } + String[] tmpStrs = str.split(", "); + for(String tmpStr : tmpStrs) { + String[] handlersPerNs = tmpStr.split(":"); + if (handlersPerNs.length != 2) { + continue; + } + handlersPerNsMap.put(handlersPerNs[0], Integer.valueOf(handlersPerNs[1])); + } + return handlersPerNsMap; + } + + private static Map<String, String> setConfiguration(String str) { + Map<String, String> conf = new HashMap<>(); + if (str == null) { + return conf; + } + String[] tmpStrs = str.split(", "); + for(String tmpStr : tmpStrs) { + String[] configKV = tmpStr.split("="); + if (configKV.length != 2) { + continue; + } + conf.put(configKV[0], configKV[1]); + } + return conf; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java index 50527b6573f9..964ed6ed6637 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java @@ -49,5 +49,7 @@ public class TestRBFConfigFields extends TestConfigurationFieldsBase { xmlPrefixToSkipCompare = new HashSet<String>(); xmlPrefixToSkipCompare.add( RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX); + xmlPrefixToSkipCompare.add( + RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_PROPORTION_KEY_PREFIX); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org