Repository: hadoop Updated Branches: refs/heads/trunk 83798f15f -> 850b2f256
HDFS-1686. Federation: Add more Balancer tests with federation setting. Contributed by Bharat Viswanadham Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/850b2f25 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/850b2f25 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/850b2f25 Branch: refs/heads/trunk Commit: 850b2f256722a360d1378b505832cd99c4c5a686 Parents: 83798f1 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Fri Mar 2 10:42:39 2018 -0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Fri Mar 2 10:42:39 2018 -0800 ---------------------------------------------------------------------- .../TestBalancerWithMultipleNameNodes.java | 165 +++++++++++++++---- 1 file changed, 133 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/850b2f25/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index f01d79e..cf4c86f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -100,6 +100,19 @@ public class TestBalancerWithMultipleNameNodes { replication = 1; this.parameters = parameters; } + + Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, + BalancerParameters parameters, Configuration conf, short + replicationFactor) throws IOException { + this.conf = conf; + this.cluster = cluster; + clients = new ClientProtocol[nNameNodes]; + for(int i = 0; i < nNameNodes; i++) { + clients[i] = cluster.getNameNode(i).getRpcServer(); + } + replication = replicationFactor; + this.parameters = parameters; + } } /* create a file with a length of <code>fileLen</code> */ @@ -154,7 +167,7 @@ public class TestBalancerWithMultipleNameNodes { static void runBalancer(Suite s, final long totalUsed, final long totalCapacity) throws Exception { - final double avg = totalUsed*100.0/totalCapacity; + double avg = totalUsed*100.0/totalCapacity; LOG.info("BALANCER 0: totalUsed=" + totalUsed + ", totalCapacity=" + totalCapacity @@ -180,6 +193,9 @@ public class TestBalancerWithMultipleNameNodes { for(boolean balanced = false; !balanced; i++) { final long[] used = new long[s.cluster.getDataNodes().size()]; final long[] cap = new long[used.length]; + final long[][] bpUsed = new long[s.clients.length][s.cluster + .getDataNodes().size()]; + for(int n = 0; n < s.clients.length; n++) { final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport( @@ -199,25 +215,52 @@ public class TestBalancerWithMultipleNameNodes { Assert.assertEquals(used[d], datanodes[d].getDfsUsed()); Assert.assertEquals(cap[d], datanodes[d].getCapacity()); } + bpUsed[n][d] = datanodes[d].getBlockPoolUsed(); } } + + balanced = true; for(int d = 0; d < used.length; d++) { - final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + s.parameters.getThreshold(); - if (!balanced) { - if (i % 100 == 0) { - LOG.warn("datanodes " + d + " is not yet balanced: " - + "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg); - LOG.warn("TestBalancer.sum(used)=" + TestBalancer.sum(used) - + ", TestBalancer.sum(cap)=" + TestBalancer.sum(cap)); + double p; + if(s.parameters.getBalancingPolicy() == BalancingPolicy.Pool.INSTANCE) { + for (int k = 0; k < s.parameters.getBlockPools().size(); k++) { + avg = TestBalancer.sum(bpUsed[k])*100/totalCapacity; + p = bpUsed[k][d] * 100.0 / cap[d]; + balanced = p <= avg + s.parameters.getThreshold(); + if (!balanced) { + if (i % 100 == 0) { + LOG.warn("datanodes " + d + " is not yet balanced: " + + "block pool used=" + bpUsed[d][k] + ", cap=" + cap[d] + + ", avg=" + avg); + LOG.warn("sum(blockpoolUsed)=" + TestBalancer.sum(bpUsed[k]) + + ", sum(cap)=" + TestBalancer.sum(cap)); + } + sleep(100); + break; + } + } + if (!balanced) { + break; + } + } else { + p = used[d] * 100.0 / cap[d]; + balanced = p <= avg + s.parameters.getThreshold(); + if (!balanced) { + if (i % 100 == 0) { + LOG.warn("datanodes " + d + " is not yet balanced: " + + "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg); + LOG.warn("sum(used)=" + TestBalancer.sum(used) + + ", sum(cap)=" + TestBalancer.sum(cap)); + } + sleep(100); + break; } - sleep(100); - break; } } } + LOG.info("BALANCER 6"); // cluster is balanced, verify that only selected blockpools were touched Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages = @@ -425,19 +468,26 @@ public class TestBalancerWithMultipleNameNodes { * It then adds an empty node and start balancing. * * @param nNameNodes Number of NameNodes - * @param capacities Capacities of the datanodes * @param racks Rack names - * @param newCapacity the capacity of the new DataNode * @param newRack the rack for the new DataNode * @param conf Configuration + * @param nNameNodestoBalance noOfNameNodestoBalance + * @param balancerParameters BalancerParameters */ - private void runTest(final int nNameNodes, long[] capacities, String[] racks, - long newCapacity, String newRack, Configuration conf) throws Exception { - final int nDataNodes = capacities.length; + private void runTest(final int nNameNodes, String[] racks, + String[] newRack, Configuration conf, + int nNameNodestoBalance, + BalancerParameters balancerParameters) + throws Exception { + final int nDataNodes = racks.length; + final long[] capacities = new long[nDataNodes]; + Arrays.fill(capacities, CAPACITY); LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes); Assert.assertEquals(nDataNodes, racks.length); - LOG.info("RUN_TEST -1"); + LOG.info("RUN_TEST -1: start a cluster with nNameNodes=" + nNameNodes + + ", nDataNodes=" + nDataNodes); + final MiniDFSCluster cluster = new MiniDFSCluster .Builder(new Configuration(conf)) .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes)) @@ -451,44 +501,63 @@ public class TestBalancerWithMultipleNameNodes { try { cluster.waitActive(); LOG.info("RUN_TEST 1"); - final Suite s = - new Suite(cluster, nNameNodes, nDataNodes, - BalancerParameters.DEFAULT, conf); + + Suite s; + + Set<String> blockpools = new HashSet<>(); + if(balancerParameters == null) { + s = new Suite(cluster, nNameNodes, nDataNodes, + BalancerParameters.DEFAULT, conf); + + } else { + for (int i=0; i< nNameNodestoBalance; i++) { + blockpools.add(cluster.getNamesystem(i).getBlockPoolId()); + } + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(balancerParameters.getBalancingPolicy()); + b.setBlockpools(blockpools); + BalancerParameters params = b.build(); + s = new Suite(cluster, nNameNodes, nDataNodes, params, conf, (short)2); + } long totalCapacity = TestBalancer.sum(capacities); - LOG.info("RUN_TEST 2"); + LOG.info("RUN_TEST 2: create files"); // fill up the cluster to be 30% full - final long totalUsed = totalCapacity*3/10; + final long totalUsed = (totalCapacity * s.replication)*3/10; final long size = (totalUsed/nNameNodes)/s.replication; for(int n = 0; n < nNameNodes; n++) { createFile(s, n, size); } - LOG.info("RUN_TEST 3"); + LOG.info("RUN_TEST 3: " + newRack.length + " new datanodes"); // start up an empty node with the same capacity and on the same rack - cluster.startDataNodes(conf, 1, true, null, - new String[]{newRack}, new long[]{newCapacity}); + final long[] newCapacity = new long[newRack.length]; + Arrays.fill(newCapacity, CAPACITY); + cluster.startDataNodes(conf, newCapacity.length, true, null, + newRack, newCapacity); - totalCapacity += newCapacity; + totalCapacity += TestBalancer.sum(newCapacity); - LOG.info("RUN_TEST 4"); + LOG.info("RUN_TEST 4: run Balancer"); // run RUN_TEST and validate results runBalancer(s, totalUsed, totalCapacity); LOG.info("RUN_TEST 5"); } finally { cluster.shutdown(); } - LOG.info("RUN_TEST 6"); + LOG.info("RUN_TEST 6: done"); } + /** Test a cluster with even distribution, - * then a new empty node is added to the cluster + * then a new empty node is added to the cluster. */ @Test - public void testBalancer() throws Exception { + public void testTwoOneOne() throws Exception { final Configuration conf = createConf(); - runTest(2, new long[]{CAPACITY}, new String[]{RACK0}, - CAPACITY/2, RACK0, conf); + runTest(2, new String[]{RACK0}, new String[] {RACK0}, conf, + 2, null); } /** Test unevenly distributed cluster */ @@ -517,4 +586,36 @@ public class TestBalancerWithMultipleNameNodes { 5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf); } + + /** Even distribution with 2 Namenodes, 4 Datanodes and 2 new Datanodes. */ + @Test(timeout = 600000) + public void testTwoFourTwo() throws Exception { + final Configuration conf = createConf(); + runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, + new String[]{RACK2, RACK2}, conf, 2, null); + } + + @Test(timeout=600000) + public void testBalancingBlockpoolsWithBlockPoolPolicy() throws Exception { + final Configuration conf = createConf(); + BalancerParameters balancerParameters = new BalancerParameters.Builder() + .setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build(); + runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, + new String[]{RACK2, RACK2}, conf, 2, + balancerParameters); + } + + @Test(timeout = 600000) + public void test1OutOf2BlockpoolsWithBlockPoolPolicy() + throws + Exception { + final Configuration conf = createConf(); + BalancerParameters balancerParameters = new BalancerParameters.Builder() + .setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build(); + runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1}, + new String[]{RACK2, RACK2}, conf, 1, + balancerParameters); + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org